From 4c1ea57d0e2c2ac5a115108e3685e58cacbceeb4 Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 26 Jun 2007 22:35:23 +0000 Subject: [PATCH] * fixed link/unlink recovery behavior * fixed up slave request recovery handling in general * unlink now reanchors when moving ots tray * fixed rename replication of straydn git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1445 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/TODO | 38 ++- .../sage/cephmds2/client/SyntheticClient.cc | 2 +- branches/sage/cephmds2/mds/CDentry.h | 18 +- branches/sage/cephmds2/mds/CInode.cc | 7 +- branches/sage/cephmds2/mds/MDCache.cc | 309 ++++++++++++++---- branches/sage/cephmds2/mds/MDCache.h | 59 ++-- branches/sage/cephmds2/mds/MDS.cc | 2 +- branches/sage/cephmds2/mds/Server.cc | 198 ++++++----- branches/sage/cephmds2/mds/Server.h | 10 +- .../sage/cephmds2/mds/events/ESlaveUpdate.h | 7 +- branches/sage/cephmds2/mds/journal.cc | 33 +- .../sage/cephmds2/messages/MDiscoverReply.h | 8 +- .../sage/cephmds2/messages/MMDSImportMap.h | 12 +- .../sage/cephmds2/messages/MMDSResolveAck.h | 56 ++++ .../sage/cephmds2/messages/MMDSSlaveRequest.h | 13 +- branches/sage/cephmds2/msg/Message.cc | 4 + branches/sage/cephmds2/msg/Message.h | 4 +- .../sage/cephmds2/script/find_auth_pins.pl | 17 +- 18 files changed, 564 insertions(+), 233 deletions(-) create mode 100644 branches/sage/cephmds2/messages/MMDSResolveAck.h diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 97561d1bed1da..44f1c36f58f69 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -47,28 +47,32 @@ sage doc sage mds -- slave request cleanup on failure - - flag request, and discard on re-dispatch? (cuz it'll be waiting on random stuff) - - - recovering node needs to know what slave ops committed - - either commit or abort - - if master op still exists, then ABORT. - - if master op dne, then COMMIT. - - surviving node needs to - - wait for log to flush (commits to finish), then - - for uncommitted requests, - - include reqid in resolve message to recovering node - - remove failed from witnesses, waiting_on_slave, and - - redispatch - - somehow wait for needed peers to recover... +- unlink needs to adjust anchortable + +/- rename_prep needs to do a CDentryDiscover on straydn. and a CDirDiscover while we're at it! + +/- slave request cleanup on failure +/ - flag request, and discard on re-dispatch? (cuz it'll be waiting on random stuff) + +/- fix slave op commit/abort logic: +/ - recovering node needs to know what stray prepare ops committed +/ - include with import_map +/ - wait for explicit commit/abort from peer. +/ - surviving node needs to +/ - wait for log to flush (commits to finish), then +/ - for uncommitted master requests, +/ - remove failed from witnesses, waiting_on_slave, and +/ - redispatch +/ - somehow wait for needed peers to recover... +/ - for uncommitted slave requests, +/ - include with import_map, wait for explicit commit/abort from peer. + +- make unlink/link behave with commit/abort recovery - new thrashing test with - link, unlink, and rename (lots of hard links!) -- check/fix mdr->slaves, wrt slave ops (authpin etc.) that 'fail' - - fix up writeback of dir inode mtime - - revisit wrlocks, dir inode mtime updates. esp in rename. - if auth, pin and be happy. decide early. - make no attempt to dirty inodes until a gather diff --git a/branches/sage/cephmds2/client/SyntheticClient.cc b/branches/sage/cephmds2/client/SyntheticClient.cc index 77cf529f49f41..834013799ef77 100644 --- a/branches/sage/cephmds2/client/SyntheticClient.cc +++ b/branches/sage/cephmds2/client/SyntheticClient.cc @@ -1496,7 +1496,7 @@ int SyntheticClient::thrash_links(const char *basedir, int dirs, int files, int sprintf(t, "/file.%d", a); src += t; } - string dst; + string dst = basedir; { char t[80]; for (int d=0; dget_name()), replica_nonce(nonce), lockstate(dn->lock.get_replica_state()), - ino(dn->get_ino()), + //ino(dn->get_ino()), remote_ino(dn->get_remote_ino()) { } string& get_dname() { return dname; } @@ -274,16 +274,18 @@ public: void _encode(bufferlist& bl) { ::_encode(dname, bl); - bl.append((char*)&replica_nonce, sizeof(replica_nonce)); - bl.append((char*)&lockstate, sizeof(lockstate)); + //::_encode(ino, bl); + ::_encode(remote_ino, bl); + ::_encode(replica_nonce, bl); + ::_encode(lockstate, bl); } void _decode(bufferlist& bl, int& off) { ::_decode(dname, bl, off); - bl.copy(off, sizeof(replica_nonce), (char*)&replica_nonce); - off += sizeof(replica_nonce); - bl.copy(off, sizeof(lockstate), (char*)&lockstate); - off += sizeof(lockstate); + //::_decode(ino, bl, off); + ::_decode(remote_ino, bl, off); + ::_decode(replica_nonce, bl, off); + ::_decode(lockstate, bl, off); } }; diff --git a/branches/sage/cephmds2/mds/CInode.cc b/branches/sage/cephmds2/mds/CInode.cc index 610cafe2fd3af..65a7938670d78 100644 --- a/branches/sage/cephmds2/mds/CInode.cc +++ b/branches/sage/cephmds2/mds/CInode.cc @@ -273,7 +273,10 @@ void CInode::make_path(string& s) s = ""; // root } else if (is_stray()) { - s = "~"; + s = "~stray"; + char n[10]; + sprintf(n, "%d", (int)(ino()-MDS_INO_STRAY_OFFSET)); + s += n; } else { s = "(dangling)"; // dangling @@ -288,7 +291,7 @@ void CInode::make_anchor_trace(vector& trace) dout(10) << "make_anchor_trace added " << trace.back() << endl; } else - assert(is_root()); + assert(is_root() || is_stray()); } void CInode::name_stray_dentry(string& dname) diff --git a/branches/sage/cephmds2/mds/MDCache.cc b/branches/sage/cephmds2/mds/MDCache.cc index 7776372314a73..9f00dac3b521a 100644 --- a/branches/sage/cephmds2/mds/MDCache.cc +++ b/branches/sage/cephmds2/mds/MDCache.cc @@ -47,6 +47,7 @@ #include "messages/MGenericMessage.h" #include "messages/MMDSImportMap.h" +#include "messages/MMDSResolveAck.h" #include "messages/MMDSCacheRejoin.h" #include "messages/MDiscover.h" @@ -296,6 +297,23 @@ void MDCache::open_foreign_stray(int who, Context *c) } +CDentry *MDCache::get_or_create_stray_dentry(CInode *in) +{ + string straydname; + in->name_stray_dentry(straydname); + frag_t fg = stray->pick_dirfrag(straydname); + + CDir *straydir = stray->get_or_open_dirfrag(this, fg); + + CDentry *straydn = straydir->lookup(straydname); + if (!straydn) + straydn = straydir->add_dentry(straydname, 0); + + return straydn; +} + + + MDSCacheObject *MDCache::get_object(MDSCacheObjectInfo &info) { // inode? @@ -961,33 +979,27 @@ void MDCache::send_import_map_now(int who) m->add_ambiguous_import(p->first, p->second); - // [survivor] list requests that may have slave PREPARE events journaled + // list prepare requests lacking a commit + // [active survivor] for (hash_map::iterator p = active_requests.begin(); p != active_requests.end(); ++p) { - // might this slave have a PREPARE journaled? - // or, were we just waiting on this slave? - if (p->second->is_master() && - (p->second->witnessed.count(who) || - p->second->waiting_on_slave.count(who))) { - if (p->second->committing) { - dout(10) << " committing " << *p->second << ", waiting for log to flush" << endl; - mds->mdlog->wait_for_sync(new C_MDC_SendImportMap(this, who)); - delete m; - return; - } - + if (p->second->is_slave() && p->second->slave_to_mds == who) { dout(10) << " including uncommitted " << *p->second << endl; - m->add_master_request(p->first); - - // discard this peer's prepare (if any) - p->second->witnessed.erase(who); - p->second->waiting_on_slave.erase(who); - - // retry request when peer recovers! - mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, p->second)); + m->add_slave_request(p->first); } } + // [resolving] + if (uncommitted_slave_updates.count(who)) { + for (map::iterator p = uncommitted_slave_updates[who].begin(); + p != uncommitted_slave_updates[who].end(); + ++p) { + dout(10) << " including uncommitted " << p->first << endl; + m->add_slave_request(p->first); + } + need_resolve_ack.insert(who); + } + // send mds->send_message_mds(m, who, MDS_PORT_CACHE); @@ -1057,16 +1069,43 @@ void MDCache::handle_mds_failure(int who) p = n; } - // clean up any slave requests from this node + // clean up any requests slave to/from this node list finish; for (hash_map::iterator p = active_requests.begin(); p != active_requests.end(); ++p) { // slave to the failed node? - if (p->second->slave_to_mds == who) - finish.push_back(p->second); - + if (p->second->slave_to_mds == who) { + if (p->second->slave_did_prepare()) { + dout(10) << " slave request " << *p->second << " uncommitted, will resolve shortly" << endl; + } else { + dout(10) << " slave request " << *p->second << " has no prepare, finishing up" << endl; + if (p->second->slave_request) + p->second->aborted = true; + else + finish.push_back(p->second); + } + } + + // failed node is slave? + if (!p->second->committing) { + if (p->second->witnessed.count(who)) { + dout(10) << " master request " << *p->second << " no longer witnessed by slave mds" << who + << endl; + // discard this peer's prepare (if any) + p->second->witnessed.erase(who); + } + + if (p->second->waiting_on_slave.count(who)) { + dout(10) << " master request " << *p->second << " waiting for slave mds" << who + << " to recover" << endl; + // retry request when peer recovers + p->second->waiting_on_slave.erase(who); + mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, p->second)); + } + } } + while (!finish.empty()) { dout(10) << "cleaning up slave request " << *finish.front() << endl; request_finish(finish.front()); @@ -1149,16 +1188,25 @@ void MDCache::handle_import_map(MMDSImportMap *m) dout(7) << "handle_import_map from " << m->get_source() << endl; int from = m->get_source().num(); - // try to disambiguate any unmatched slave prepares - for (list::iterator p = m->master_requests.begin(); - p != m->master_requests.end(); - ++p) { - if (uncommitted_slave_updates.count(*p)) { - // master request still exists; ABORT - dout(10) << " master request " << *p << " still in-progress, ABORTing our PREPARE" << endl; - uncommitted_slave_updates.erase(*p); - mds->mdlog->submit_entry(new ESlaveUpdate("unknown/recovered", *p, ESlaveUpdate::OP_ABORT)); + // ambiguous slave requests? + if (!m->slave_requests.empty()) { + MMDSResolveAck *ack = new MMDSResolveAck; + + for (list::iterator p = m->slave_requests.begin(); + p != m->slave_requests.end(); + ++p) { + if (mds->clientmap.have_completed_request(*p)) { + // COMMIT + dout(10) << " ambiguous slave request " << *p << " will COMMIT" << endl; + ack->add_commit(*p); + } else { + // ABORT + dout(10) << " ambiguous slave request " << *p << " will ABORT" << endl; + ack->add_abort(*p); + } } + + mds->send_message_mds(ack, from, MDS_PORT_CACHE); } // update my dir_auth values @@ -1201,9 +1249,9 @@ void MDCache::handle_import_map(MMDSImportMap *m) show_subtrees(); - // recovering? - if (!mds->is_rejoin() && !mds->is_active() && !mds->is_stopping()) { - // note ambiguous imports too.. unless i'm already active + // resolving? + if (mds->is_resolve()) { + // note ambiguous imports too for (map >::iterator pi = m->ambiguous_imap.begin(); pi != m->ambiguous_imap.end(); ++pi) { @@ -1213,39 +1261,90 @@ void MDCache::handle_import_map(MMDSImportMap *m) // did i get them all? got_import_map.insert(from); - - if (got_import_map == recovery_set) { - dout(10) << "got all import maps, done resolving subtrees" << endl; - commit_slave_updates(); - disambiguate_imports(); - recalc_auth_bits(); - trim_non_auth(); - - // reconnect clients - mds->set_want_state(MDSMap::STATE_RECONNECT); - } else { - dout(10) << "still waiting for more importmaps, got " << got_import_map - << ", need " << recovery_set << endl; - } + maybe_resolve_finish(); } delete m; } +void MDCache::maybe_resolve_finish() +{ + if (got_import_map != recovery_set) { + dout(10) << "still waiting for more importmaps, got " << got_import_map + << ", need " << recovery_set << endl; + } + else if (!need_resolve_ack.empty()) { + dout(10) << "still waiting for resolve_ack from " << need_resolve_ack << endl; + } + else { + dout(10) << "got all import maps, resolve_acks, done resolving subtrees" << endl; + disambiguate_imports(); + recalc_auth_bits(); + trim_non_auth(); + + // reconnect clients + mds->set_want_state(MDSMap::STATE_RECONNECT); + } +} -void MDCache::commit_slave_updates() +void MDCache::handle_resolve_ack(MMDSResolveAck *ack) { - dout(10) << "commit_slave_updates" << endl; - - while (!uncommitted_slave_updates.empty()) { - map::iterator p = uncommitted_slave_updates.begin(); - dout(10) << " committing slave update " << *p << endl; - p->second.replay(mds); - mds->mdlog->submit_entry(new ESlaveUpdate("unknown/recovered", p->first, ESlaveUpdate::OP_COMMIT)); + dout(10) << "handle_resolve_ack " << *ack << " from " << ack->get_source() << endl; + int from = ack->get_source().num(); + + for (list::iterator p = ack->commit.begin(); + p != ack->commit.end(); + ++p) { + dout(10) << " commit on slave " << *p << endl; + + if (mds->is_resolve()) { + // replay + assert(uncommitted_slave_updates[from].count(*p)); + uncommitted_slave_updates[from][*p].replay(mds); + uncommitted_slave_updates[from].erase(*p); + // log commit + mds->mdlog->submit_entry(new ESlaveUpdate("unknown", *p, from, ESlaveUpdate::OP_COMMIT)); + } else { + MDRequest *mdr = request_get(*p); + assert(mdr->slave_request == 0); // shouldn't be doing anything! + request_finish(mdr); + } } + + for (list::iterator p = ack->abort.begin(); + p != ack->abort.end(); + ++p) { + dout(10) << " abort on slave " << *p << endl; + + if (mds->is_resolve()) { + assert(uncommitted_slave_updates[from].count(*p)); + uncommitted_slave_updates[from].erase(*p); + mds->mdlog->submit_entry(new ESlaveUpdate("unknown", *p, from, ESlaveUpdate::OP_ABORT)); + } else { + MDRequest *mdr = request_get(*p); + if (mdr->slave_commit) { + mdr->slave_commit->finish(-1); + delete mdr->slave_commit; + mdr->slave_commit = 0; + } + if (mdr->slave_request) + mdr->aborted = true; + else + request_finish(mdr); + } + } + + need_resolve_ack.erase(from); + + if (mds->is_resolve()) + maybe_resolve_finish(); + + delete ack; } + + void MDCache::disambiguate_imports() { dout(10) << "disambiguate_imports" << endl; @@ -1644,7 +1743,7 @@ void MDCache::handle_cache_rejoin_rejoin(MMDSCacheRejoin *m) if (dn->is_replica(from) && (m->weak_dentries.count(dn->get_dir()->dirfrag()) == 0 || m->weak_dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) { - dn->remove_replica(from); + dentry_remove_replica(dn, from); dout(10) << " rem " << *dn << endl; } } @@ -2726,9 +2825,20 @@ void MDCache::inode_remove_replica(CInode *in, int from) mds->locker->simple_eval(&in->linklock); mds->locker->simple_eval(&in->dirfragtreelock); mds->locker->file_eval(&in->filelock); + mds->locker->scatter_eval(&in->dirlock); } } +void MDCache::dentry_remove_replica(CDentry *dn, int from) +{ + dn->remove_replica(from); + + // fix lock + if (dn->lock.remove_replica(from) || + !dn->is_replicated()) + mds->locker->simple_eval(&dn->lock); +} + // ========================================================================================= @@ -2957,6 +3067,10 @@ void MDCache::dispatch(Message *m) handle_import_map((MMDSImportMap*)m); break; + case MSG_MDS_RESOLVEACK: + handle_resolve_ack((MMDSResolveAck*)m); + break; + case MSG_MDS_CACHEREJOIN: handle_cache_rejoin((MMDSCacheRejoin*)m); break; @@ -3389,7 +3503,7 @@ CInode *MDCache::get_dentry_inode(CDentry *dn, MDRequest *mdr) dn->link_remote(in); return in; } else { - dout(10) << "get_dentry_ninode on remote dn, opening inode for " << *dn << endl; + dout(10) << "get_dentry_inode on remote dn, opening inode for " << *dn << endl; open_remote_ino(dn->get_remote_ino(), mdr, new C_MDS_RetryRequest(this, mdr)); return 0; } @@ -3444,11 +3558,13 @@ void MDCache::open_remote_ino_2(inodeno_t ino, CInode *in = 0; while (1) { // inode? + dout(10) << " " << i << ": " << anchortrace[i-1] << endl; CInode *in = get_inode(anchortrace[i-1].ino); if (in) break; i--; if (!i) { - in = root; + CInode *in = get_inode(anchortrace[i].dirfrag.ino); + assert(in); break; } } @@ -3562,6 +3678,13 @@ void MDCache::request_finish(MDRequest *mdr) { dout(7) << "request_finish " << *mdr << endl; + // slave finisher? + if (mdr->slave_commit) { + mdr->slave_commit->finish(0); + delete mdr->slave_commit; + mdr->slave_commit = 0; + } + delete mdr->client_request; delete mdr->slave_request; request_cleanup(mdr); @@ -4115,6 +4238,7 @@ void MDCache::handle_discover(MDiscover *dis) dout(7) << *cur << " dirfrag not open, not inode auth, setting dir_auth_hint" << endl; reply->set_dir_auth_hint(cur->authority().first); } + reply->set_wanted_xlocks_hint(dis->wants_xlocked()); // set hint (+ dentry, if there is one) if (dis->get_want().depth() > i) @@ -4401,16 +4525,19 @@ void MDCache::handle_discover_reply(MDiscoverReply *m) // let's try again. int hint = m->get_dir_auth_hint(); - // include any path fragment we were looking for at the time + + // include dentry _and_ dirfrag, just in case filepath want; - if (m->get_error_dentry().length() > 0) - want.push_dentry(m->get_error_dentry()); - - mds->send_message_mds(new MDiscover(mds->get_nodeid(), - cur->ino(), - want, - true), // being conservative here. - hint, MDS_PORT_CACHE); + want.push_dentry(m->get_error_dentry()); + MDiscover *dis = new MDiscover(mds->get_nodeid(), + cur->ino(), + want, + true, + m->get_wanted_xlocks_hint()); + frag_t fg = cur->pick_dirfrag(m->get_error_dentry()); + dis->set_base_dir_frag(fg); + + mds->send_message_mds(dis, hint, MDS_PORT_CACHE); // note the dangling discover dir_discovers[cur->ino()].insert(hint); @@ -4481,7 +4608,47 @@ CDir *MDCache::forge_replica_dir(CInode *diri, frag_t fg, int from) return dir; } + +CDentry *MDCache::add_replica_stray(bufferlist &bl, CInode *in, int from) +{ + int off = 0; + + // inode + CInodeDiscover indis; + indis._decode(bl, off); + CInode *strayin = get_inode(indis.get_ino()); + if (!strayin) + strayin = new CInode(this, false); + indis.update_inode(strayin); + dout(15) << "strayin " << *strayin << endl; + + // dir + CDirDiscover dirdis; + dirdis._decode(bl, off); + list finished; + CDir *straydir = add_replica_dir(strayin, dirdis.get_dirfrag().frag, dirdis, + from, finished); + mds->queue_waiters(finished); + dout(15) << "straydir " << *straydir << endl; + + // dentry + CDentryDiscover dndis; + dndis._decode(bl, off); + + string straydname; + in->name_stray_dentry(straydname); + CDentry *straydn = straydir->lookup(straydname); + if (straydn) { + dout(10) << "had straydn " << *straydn << endl; + dndis.update_dentry(straydn); + } else { + straydn = straydir->add_dentry( dndis.get_dname(), 0 ); + dndis.update_new_dentry(straydn); + dout(10) << "added straydn " << *straydn << endl; + } + return straydn; +} diff --git a/branches/sage/cephmds2/mds/MDCache.h b/branches/sage/cephmds2/mds/MDCache.h index bdef2a95e8b12..6d08495fe98e9 100644 --- a/branches/sage/cephmds2/mds/MDCache.h +++ b/branches/sage/cephmds2/mds/MDCache.h @@ -41,6 +41,7 @@ class Logger; class Message; class MMDSImportMap; +class MMDSResolveAck; class MMDSCacheRejoin; class MMDSCacheRejoinAck; class MDiscover; @@ -104,6 +105,7 @@ struct MDRequest { // (useful for wrlock, which may be a moving auth target) bool done_locking; bool committing; + bool aborted; // for rename/link/unlink utime_t now; @@ -126,25 +128,27 @@ struct MDRequest { MDRequest() : client_request(0), ref(0), slave_request(0), slave_to_mds(-1), - done_locking(false), committing(false), + done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), slave_commit(0) { } MDRequest(metareqid_t ri, MClientRequest *req) : reqid(ri), client_request(req), ref(0), slave_request(0), slave_to_mds(-1), - done_locking(false), committing(false), + done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), slave_commit(0) { } MDRequest(metareqid_t ri, int by) : reqid(ri), client_request(0), ref(0), slave_request(0), slave_to_mds(by), - done_locking(false), committing(false), + done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), slave_commit(0) { } bool is_master() { return slave_to_mds < 0; } bool is_slave() { return slave_to_mds >= 0; } + bool slave_did_prepare() { return slave_commit; } + // pin items in cache void pin(MDSCacheObject *o) { if (pins.count(o) == 0) { @@ -296,21 +300,44 @@ public: protected: set recovery_set; +public: + void set_recovery_set(set& s); + void handle_mds_failure(int who); + void handle_mds_recovery(int who); + +protected: + // [resolve] // from EImportStart w/o EImportFinish during journal replay map > my_ambiguous_imports; // from MMDSImportMaps map > > other_ambiguous_imports; - map uncommitted_slave_updates; + map > uncommitted_slave_updates; friend class ESlaveUpdate; set wants_import_map; // nodes i need to send my import map to set got_import_map; // nodes i got import_maps from + set need_resolve_ack; // nodes i need a resolve_ack from void handle_import_map(MMDSImportMap *m); - void commit_slave_updates(); + void handle_resolve_ack(MMDSResolveAck *m); + void maybe_resolve_finish(); void disambiguate_imports(); + void recalc_auth_bits(); +public: + // ambiguous imports + void add_ambiguous_import(dirfrag_t base, list& bounds); + void add_ambiguous_import(CDir *base, const set& bounds); + void cancel_ambiguous_import(dirfrag_t dirino); + void finish_ambiguous_import(dirfrag_t dirino); + void send_import_map(int who); + void send_import_map_now(int who); + void send_import_map_later(int who); + void send_pending_import_maps(); // maybe. + void log_import_map(Context *onsync=0); +protected: + // [rejoin] set rejoin_gather; // nodes from whom i need a rejoin set rejoin_ack_gather; // nodes from whom i need a rejoin ack set want_rejoin_ack; // nodes to whom i need to send a rejoin ack @@ -322,26 +349,10 @@ protected: void handle_cache_rejoin_missing(MMDSCacheRejoin *m); void handle_cache_rejoin_full(MMDSCacheRejoin *m); void send_cache_rejoin_acks(); - void recalc_auth_bits(); - public: - void set_recovery_set(set& s); - void handle_mds_failure(int who); - void handle_mds_recovery(int who); - void send_import_map(int who); - void send_import_map_now(int who); - void send_import_map_later(int who); - void send_pending_import_maps(); // maybe. void send_cache_rejoins(); - void log_import_map(Context *onsync=0); - // ambiguous imports - void add_ambiguous_import(dirfrag_t base, list& bounds); - void add_ambiguous_import(CDir *base, const set& bounds); - void cancel_ambiguous_import(dirfrag_t dirino); - void finish_ambiguous_import(dirfrag_t dirino); - friend class Locker; @@ -430,6 +441,7 @@ public: } void inode_remove_replica(CInode *in, int rep); + void dentry_remove_replica(CDentry *dn, int rep); void rename_file(CDentry *srcdn, CDentry *destdn); @@ -462,6 +474,7 @@ public: CInode *create_stray_inode(int whose=-1); void open_local_stray(); void open_foreign_stray(int who, Context *c); + CDentry *get_or_create_stray_dentry(CInode *in); Context *_get_waiter(MDRequest *mdr, Message *req); int path_traverse(MDRequest *mdr, Message *req, @@ -518,6 +531,10 @@ protected: int from, list& finished); CDir* forge_replica_dir(CInode *diri, frag_t fg, int from); +public: + CDentry *add_replica_stray(bufferlist &bl, CInode *strayin, int from); +protected: + // -- namespace -- diff --git a/branches/sage/cephmds2/mds/MDS.cc b/branches/sage/cephmds2/mds/MDS.cc index 5292888baff89..33d24b30d2cde 100644 --- a/branches/sage/cephmds2/mds/MDS.cc +++ b/branches/sage/cephmds2/mds/MDS.cc @@ -605,9 +605,9 @@ void MDS::handle_mds_map(MMDSMap *m) dout(10) << "i am newly resolving, sharing import map" << endl; set who; mdsmap->get_mds_set(who, MDSMap::STATE_RESOLVE); + mdsmap->get_mds_set(who, MDSMap::STATE_REJOIN); mdsmap->get_mds_set(who, MDSMap::STATE_ACTIVE); mdsmap->get_mds_set(who, MDSMap::STATE_STOPPING); - mdsmap->get_mds_set(who, MDSMap::STATE_REJOIN); // hrm. FIXME. for (set::iterator p = who.begin(); p != who.end(); ++p) { if (*p == whoami) continue; mdcache->send_import_map(*p); // now. diff --git a/branches/sage/cephmds2/mds/Server.cc b/branches/sage/cephmds2/mds/Server.cc index 50e930fb304ff..bc15a2c01ed79 100644 --- a/branches/sage/cephmds2/mds/Server.cc +++ b/branches/sage/cephmds2/mds/Server.cc @@ -599,6 +599,12 @@ void Server::dispatch_slave_request(MDRequest *mdr) { dout(7) << "dispatch_slave_request " << *mdr << " " << *mdr->slave_request << endl; + if (mdr->aborted) { + dout(7) << " abort flag set, finishing" << endl; + mdcache->request_finish(mdr); + return; + } + switch (mdr->slave_request->get_op()) { case MMDSSlaveRequest::OP_XLOCK: { @@ -628,7 +634,7 @@ void Server::dispatch_slave_request(MDRequest *mdr) << *lock << " on " << *lock->get_parent() << endl; } else { dout(10) << "don't have object, dropping" << endl; - assert(0); // can this happen? hmm. + assert(0); // can this happen, if we auth pinned properly. } } @@ -669,13 +675,6 @@ void Server::dispatch_slave_request(MDRequest *mdr) break; case MMDSSlaveRequest::OP_FINISH: - // slave finisher? - if (mdr->slave_commit) { - mdr->slave_commit->finish(0); - delete mdr->slave_commit; - mdr->slave_commit = 0; - } - // finish off request. mdcache->request_finish(mdr); break; @@ -2089,7 +2088,7 @@ void Server::handle_slave_link_prep(MDRequest *mdr) } // journal it - ESlaveUpdate *le = new ESlaveUpdate("slave_link_prep", mdr->reqid, ESlaveUpdate::OP_PREPARE); + ESlaveUpdate *le = new ESlaveUpdate("slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE); version_t tpv = targeti->pre_dirty(); @@ -2122,8 +2121,7 @@ public: C_MDS_SlaveLinkCommit(Server *s, MDRequest *r, CInode *t, version_t v, bool in) : server(s), mdr(r), targeti(t), tpv(v), inc(in) { } void finish(int r) { - assert(r == 0); - server->_commit_slave_link(mdr, targeti, tpv, inc); + server->_commit_slave_link(mdr, r, targeti, tpv, inc); } }; @@ -2145,22 +2143,33 @@ void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv, mdr->slave_request = 0; } -void Server::_commit_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv, bool inc) +void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti, version_t tpv, bool inc) { dout(10) << "_commit_slave_link " << *mdr + << " r=" << r << " inc=" << inc << " " << *targeti << endl; - // update the target - if (inc) - targeti->inode.nlink++; - else - targeti->inode.nlink--; - targeti->inode.ctime = mdr->now; - targeti->mark_dirty(tpv); + ESlaveUpdate *le; + + if (r == 0) { + // commit. + + // update the target + if (inc) + targeti->inode.nlink++; + else + targeti->inode.nlink--; + targeti->inode.ctime = mdr->now; + targeti->mark_dirty(tpv); + + // write a commit to the journal + le = new ESlaveUpdate("slave_link_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT); + } else { + // abort + le = new ESlaveUpdate("slave_link_abort", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_ABORT); + } - // write a commit to the journal - ESlaveUpdate *le = new ESlaveUpdate("slave_link_commit", mdr->reqid, ESlaveUpdate::OP_COMMIT); mds->mdlog->submit_entry(le); } @@ -2280,19 +2289,24 @@ void Server::handle_client_unlink(MDRequest *mdr) // yay! mdr->done_locking = true; // avoid wrlock racing - if (mdr->now == utime_t()) mdr->now = g_clock.real_now(); // get stray dn ready? CDentry *straydn = 0; if (dn->is_primary()) { - string straydname; - dn->inode->name_stray_dentry(straydname); - frag_t fg = mdcache->get_stray()->pick_dirfrag(straydname); - CDir *straydir = mdcache->get_stray()->get_or_open_dirfrag(mdcache, fg); - straydn = straydir->add_dentry(straydname, 0); + straydn = mdcache->get_or_create_stray_dentry(dn->inode); dout(10) << " straydn is " << *straydn << endl; + + if (!mdr->dst_reanchor_atid && + dn->inode->is_anchored()) { + dout(10) << "reanchoring to stray " << *dn->inode << endl; + vector trace; + straydn->make_anchor_trace(trace, dn->inode); + mds->anchorclient->prepare_update(dn->inode->ino(), trace, &mdr->dst_reanchor_atid, + new C_MDS_RetryRequest(mdcache, mdr)); + return; + } } // ok! @@ -2360,7 +2374,10 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn) pi->nlink--; pi->ctime = mdr->now; pi->version = ipv; - + + if (mdr->dst_reanchor_atid) + le->metablob.add_anchor_transaction(mdr->dst_reanchor_atid); + // finisher C_MDS_unlink_local_finish *fin = new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, ipv, dirpv); @@ -2412,11 +2429,15 @@ void Server::_unlink_local_finish(MDRequest *mdr, } mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE); } - + // reply MClientReply *reply = new MClientReply(mdr->client_request, 0); reply_request(mdr, reply, dn->dir->get_inode()); // FIXME: imprecise ref + // commit anchor update? + if (mdr->dst_reanchor_atid) + mds->anchorclient->commit(mdr->dst_reanchor_atid); + // clean up? if (straydn) mdcache->eval_stray(straydn); @@ -2471,6 +2492,9 @@ void Server::_unlink_remote(MDRequest *mdr, CDentry *dn) le->metablob.add_dir_context(dn->get_dir()); le->metablob.add_null_dentry(dn, true); + if (mdr->dst_reanchor_atid) + le->metablob.add_anchor_transaction(mdr->dst_reanchor_atid); + // finisher C_MDS_unlink_remote_finish *fin = new C_MDS_unlink_remote_finish(mds, mdr, dn, dirpv); @@ -2514,6 +2538,10 @@ void Server::_unlink_remote_finish(MDRequest *mdr, // reply MClientReply *reply = new MClientReply(mdr->client_request, 0); reply_request(mdr, reply, dn->dir->get_inode()); // FIXME: imprecise ref + + // commit anchor update? + if (mdr->dst_reanchor_atid) + mds->anchorclient->commit(mdr->dst_reanchor_atid); } @@ -2727,6 +2755,13 @@ void Server::handle_client_rename(MDRequest *mdr) if (mdr->now == utime_t()) mdr->now = g_clock.real_now(); + // -- create stray dentry? -- + CDentry *straydn = 0; + if (destdn->is_primary()) { + straydn = mdcache->get_or_create_stray_dentry(destdn->inode); + dout(10) << "straydn is " << *straydn << endl; + } + // -- prepare witnesses -- set witnesses = mdr->extra_witnesses; if (srcdn->is_auth()) @@ -2746,6 +2781,18 @@ void Server::handle_client_rename(MDRequest *mdr) srcdn->make_path(req->srcdnpath); destdn->make_path(req->destdnpath); req->now = mdr->now; + + if (straydn) { + CInodeDiscover *indis = straydn->dir->inode->replicate_to(*p); + CDirDiscover *dirdis = straydn->dir->replicate_to(*p); + CDentryDiscover *dndis = straydn->replicate_to(*p); + indis->_encode(req->stray); + dirdis->_encode(req->stray); + dndis->_encode(req->stray); + delete dirdis; + delete dndis; + } + mds->send_message_mds(req, *p, MDS_PORT_SERVER); assert(mdr->waiting_on_slave.count(*p) == 0); @@ -2773,12 +2820,6 @@ void Server::handle_client_rename(MDRequest *mdr) dout(10) << " already (just!) got inode export from srcdn auth" << endl; } - // -- prepare journal entry -- - EUpdate *le = new EUpdate("rename"); - le->metablob.add_client_req(mdr->reqid); - - CDentry *straydn = _rename_prepare(mdr, &le->metablob, srcdn, destdn); - // -- prepare anchor updates -- bool linkmerge = (srcdn->inode == destdn->inode && (srcdn->is_primary() || destdn->is_primary())); @@ -2801,6 +2842,8 @@ void Server::handle_client_rename(MDRequest *mdr) destdn->inode->is_anchored() && !mdr->dst_reanchor_atid) { dout(10) << "reanchoring dst->stray " << *destdn->inode << endl; + + assert(straydn); vector trace; straydn->make_anchor_trace(trace, destdn->inode); @@ -2814,11 +2857,18 @@ void Server::handle_client_rename(MDRequest *mdr) return; // waiting for anchor prepares } + // -- prepare journal entry -- + EUpdate *le = new EUpdate("rename"); + le->metablob.add_client_req(mdr->reqid); + + _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn); // -- commit locally -- C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn); // and apply! + // we do this now because we may also be importing an inode, and the locker is currently + // depending on a this happening quickly. _rename_apply(mdr, srcdn, destdn, straydn); journal_opens(); // journal pending opens, just in case @@ -2856,9 +2906,9 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe // helpers -CDentry *Server::_rename_prepare(MDRequest *mdr, - EMetaBlob *metablob, - CDentry *srcdn, CDentry *destdn) +void Server::_rename_prepare(MDRequest *mdr, + EMetaBlob *metablob, + CDentry *srcdn, CDentry *destdn, CDentry *straydn) { dout(10) << "_rename_prepare " << *mdr << " " << *srcdn << " " << *destdn << endl; @@ -2868,7 +2918,6 @@ CDentry *Server::_rename_prepare(MDRequest *mdr, inode_t *pi = 0; // inode getting nlink-- version_t ipv; // it's version - CDentry *straydn = 0; if (linkmerge) { dout(10) << "will merge remote+primary links" << endl; @@ -2886,18 +2935,10 @@ CDentry *Server::_rename_prepare(MDRequest *mdr, metablob->add_null_dentry(srcdn, true); } else { - // move to stray? if (destdn->is_primary()) { - // primary. - // move inode to stray dir. - string straydname; - destdn->inode->name_stray_dentry(straydname); - frag_t fg = mdcache->get_stray()->pick_dirfrag(straydname); - CDir *straydir = mdcache->get_stray()->get_or_open_dirfrag(mdcache, fg); - straydn = straydir->add_dentry(straydname, 0); - dout(10) << "straydn is " << *straydn << endl; - mdr->pin(straydn); + // primary. we'll move inode to stray dir. + assert(straydn); // link-- inode, move to stray dir. metablob->add_dir_context(straydn->dir); @@ -2909,7 +2950,7 @@ CDentry *Server::_rename_prepare(MDRequest *mdr, // remote. // nlink-- targeti metablob->add_dir_context(destdn->inode->get_parent_dir()); - if (destdn->is_auth()) + if (destdn->inode->is_auth()) ipv = mdr->pvmap[destdn->inode] = destdn->inode->pre_dirty(); pi = metablob->add_primary_dentry(destdn->inode->parent, true, destdn->inode); // update primary dout(10) << "remote targeti (nlink--) is " << *destdn->inode << endl; @@ -2950,7 +2991,11 @@ CDentry *Server::_rename_prepare(MDRequest *mdr, pi->version = ipv; } - return straydn; + // anchor updates? + if (mdr->src_reanchor_atid) + metablob->add_anchor_transaction(mdr->src_reanchor_atid); + if (mdr->dst_reanchor_atid) + metablob->add_anchor_transaction(mdr->dst_reanchor_atid); } @@ -3048,7 +3093,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen srcdn->mark_dirty(mdr->pvmap[srcdn]); // srcdn inode import? - if (!srcdn->is_auth() && destdn->is_auth()) { + if (!srcdn->is_auth() && destdn->is_primary() && destdn->is_auth()) { assert(mdr->inode_import.length() > 0); int off = 0; mdcache->migrator->decode_import_inode(destdn, mdr->inode_import, off, @@ -3057,7 +3102,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen } // update subtree map? - if (destdn->inode->is_dir()) + if (destdn->is_primary() && destdn->inode->is_dir()) mdcache->adjust_subtree_after_rename(destdn->inode, srcdn->dir); } @@ -3088,7 +3133,7 @@ public: C_MDS_SlaveRenameCommit(Server *s, MDRequest *m, CDentry *sr, CDentry *de, CDentry *st) : server(s), mdr(m), srcdn(sr), destdn(de), straydn(st) {} void finish(int r) { - server->_commit_slave_rename(mdr, srcdn, destdn, straydn); + server->_commit_slave_rename(mdr, r, srcdn, destdn, straydn); } }; @@ -3126,30 +3171,21 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) dout(10) << " srcdn " << *srcdn << endl; mdr->pin(srcdn); - // open destdn stray dirfrag? + // stray? + CDentry *straydn = 0; if (destdn->is_primary()) { - CInode *dstray = mdcache->get_inode(MDS_INO_STRAY(mdr->slave_to_mds)); - if (!dstray) { - mdcache->open_foreign_stray(mdr->slave_to_mds, new C_MDS_RetryRequest(mdcache, mdr)); - return; - } - - string straydname; - destdn->inode->name_stray_dentry(straydname); - frag_t fg = dstray->pick_dirfrag(straydname); - CDir *straydir = dstray->get_dirfrag(fg); - if (!straydir) { - mdcache->open_remote_dir(dstray, fg, new C_MDS_RetryRequest(mdcache, mdr)); - return; - } - dout(10) << " straydir is " << *straydir << endl; + assert(mdr->slave_request->stray.length() > 0); + straydn = mdcache->add_replica_stray(mdr->slave_request->stray, + destdn->inode, mdr->slave_to_mds); + assert(straydn); + mdr->pin(straydn); } // journal it - ESlaveUpdate *le = new ESlaveUpdate("slave_rename_prep", mdr->reqid, ESlaveUpdate::OP_PREPARE); + ESlaveUpdate *le = new ESlaveUpdate("slave_rename_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE); mdr->now = mdr->slave_request->now; - CDentry *straydn = _rename_prepare(mdr, &le->metablob, srcdn, destdn); + _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn); mds->mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn)); } @@ -3179,14 +3215,22 @@ void Server::_logged_slave_rename(MDRequest *mdr, mdr->slave_request = 0; } -void Server::_commit_slave_rename(MDRequest *mdr, +void Server::_commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn) { - dout(10) << "_commit_slave_rename " << *mdr << endl; - _rename_apply(mdr, srcdn, destdn, straydn); + dout(10) << "_commit_slave_rename " << *mdr << " r=" << r << endl; - // write a commit to the journal - ESlaveUpdate *le = new ESlaveUpdate("slave_rename_commit", mdr->reqid, ESlaveUpdate::OP_COMMIT); + ESlaveUpdate *le; + if (r == 0) { + // commit + _rename_apply(mdr, srcdn, destdn, straydn); + + // write a commit to the journal + le = new ESlaveUpdate("slave_rename_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT); + } else { + // abort + le = new ESlaveUpdate("slave_rename_abort", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_ABORT); + } mds->mdlog->submit_entry(le); } diff --git a/branches/sage/cephmds2/mds/Server.h b/branches/sage/cephmds2/mds/Server.h index d14bf069e1d97..b7e2197c522e5 100644 --- a/branches/sage/cephmds2/mds/Server.h +++ b/branches/sage/cephmds2/mds/Server.h @@ -129,7 +129,7 @@ public: void handle_slave_link_prep(MDRequest *mdr); void _logged_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv, bool inc); - void _commit_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv, bool inc); + void _commit_slave_link(MDRequest *mdr, int r, CInode *targeti, version_t tpv, bool inc); void handle_slave_link_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m); // unlink @@ -151,16 +151,16 @@ public: CDentry *srcdn, CDentry *destdn, CDentry *straydn); // helpers - CDentry *_rename_prepare(MDRequest *mdr, - EMetaBlob *metablob, - CDentry *srcdn, CDentry *destdn); + void _rename_prepare(MDRequest *mdr, + EMetaBlob *metablob, + CDentry *srcdn, CDentry *destdn, CDentry *straydn); void _rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn); // slaving void handle_slave_rename_prep(MDRequest *mdr); void handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m); void _logged_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn); - void _commit_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn); + void _commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn); void handle_slave_rename_get_inode(MDRequest *mdr); void handle_slave_rename_get_inode_ack(MDRequest *mdr, MMDSSlaveRequest *m); diff --git a/branches/sage/cephmds2/mds/events/ESlaveUpdate.h b/branches/sage/cephmds2/mds/events/ESlaveUpdate.h index 6550d2838297f..f83dc87778a0f 100644 --- a/branches/sage/cephmds2/mds/events/ESlaveUpdate.h +++ b/branches/sage/cephmds2/mds/events/ESlaveUpdate.h @@ -26,14 +26,16 @@ public: string type; metareqid_t reqid; + int master; int op; // prepare, commit, abort EMetaBlob metablob; ESlaveUpdate() : LogEvent(EVENT_SLAVEUPDATE) { } - ESlaveUpdate(const char *s, metareqid_t ri, int o) : + ESlaveUpdate(const char *s, metareqid_t ri, int mastermds, int o) : LogEvent(EVENT_SLAVEUPDATE), type(s), reqid(ri), + master(mastermds), op(o) { } void print(ostream& out) { @@ -41,18 +43,21 @@ public: out << type << " "; out << " " << op; out << " " << reqid; + out << " for mds" << master; out << metablob; } void encode_payload(bufferlist& bl) { ::_encode(type, bl); ::_encode(reqid, bl); + ::_encode(master, bl); ::_encode(op, bl); metablob._encode(bl); } void decode_payload(bufferlist& bl, int& off) { ::_decode(type, bl, off); ::_decode(reqid, bl, off); + ::_decode(master, bl, off); ::_decode(op, bl, off); metablob._decode(bl, off); } diff --git a/branches/sage/cephmds2/mds/journal.cc b/branches/sage/cephmds2/mds/journal.cc index 5c522d2fa2850..da57c55e471e2 100644 --- a/branches/sage/cephmds2/mds/journal.cc +++ b/branches/sage/cephmds2/mds/journal.cc @@ -731,29 +731,34 @@ void ESlaveUpdate::replay(MDS *mds) { switch (op) { case ESlaveUpdate::OP_PREPARE: - // FIXME: horribly inefficient - dout(10) << "ESlaveUpdate.replay prepare " << reqid << ": saving blob for later commit" << endl; - assert(mds->mdcache->uncommitted_slave_updates.count(reqid) == 0); - mds->mdcache->uncommitted_slave_updates[reqid] = metablob; + // FIXME: horribly inefficient copy; EMetaBlob needs a swap() or something + dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds" << master + << ": saving blob for later commit" << endl; + assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid) == 0); + mds->mdcache->uncommitted_slave_updates[master][reqid] = metablob; break; case ESlaveUpdate::OP_COMMIT: - if (mds->mdcache->uncommitted_slave_updates.count(reqid)) { - dout(10) << "ESlaveUpdate.replay commit " << reqid << ": applying previously saved blob" << endl; - mds->mdcache->uncommitted_slave_updates[reqid].replay(mds); - mds->mdcache->uncommitted_slave_updates.erase(reqid); + if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) { + dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master + << ": applying previously saved blob" << endl; + mds->mdcache->uncommitted_slave_updates[master][reqid].replay(mds); + mds->mdcache->uncommitted_slave_updates[master].erase(reqid); } else { - dout(10) << "ESlaveUpdate.replay commit " << reqid << ": ignoring, no previously saved blob" << endl; + dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master + << ": ignoring, no previously saved blob" << endl; } break; case ESlaveUpdate::OP_ABORT: - if (mds->mdcache->uncommitted_slave_updates.count(reqid)) { - dout(10) << "ESlaveUpdate.replay abort " << reqid << ": discarding previously saved blob" << endl; - assert(mds->mdcache->uncommitted_slave_updates.count(reqid)); - mds->mdcache->uncommitted_slave_updates.erase(reqid); + if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) { + dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master + << ": discarding previously saved blob" << endl; + assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid)); + mds->mdcache->uncommitted_slave_updates[master].erase(reqid); } else { - dout(10) << "ESlaveUpdate.replay abort " << reqid << ": ignoring, no previously saved blob" << endl; + dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master + << ": ignoring, no previously saved blob" << endl; } break; diff --git a/branches/sage/cephmds2/messages/MDiscoverReply.h b/branches/sage/cephmds2/messages/MDiscoverReply.h index 4367e8177a052..5821bc85db38e 100644 --- a/branches/sage/cephmds2/messages/MDiscoverReply.h +++ b/branches/sage/cephmds2/messages/MDiscoverReply.h @@ -76,7 +76,8 @@ class MDiscoverReply : public Message { bool flag_error_ino; bool flag_error_dir; string error_dentry; // dentry that was not found (to trigger waiters on asker) - int dir_auth_hint; + int dir_auth_hint; + bool wanted_xlocks_hint; vector dirs; // not inode-aligned if no_base_dir = true. vector dentries; // not inode-aligned if no_base_dentry = true @@ -113,6 +114,9 @@ class MDiscoverReply : public Message { bool is_flag_error_dir() { return flag_error_dir; } string& get_error_dentry() { return error_dentry; } int get_dir_auth_hint() { return dir_auth_hint; } + bool get_wanted_xlocks_hint() { return wanted_xlocks_hint; } + + void set_wanted_xlocks_hint(bool w) { wanted_xlocks_hint = w; } // these index _arguments_ are aligned to each ([[dir, ] dentry, ] inode) set. CInodeDiscover& get_inode(int n) { return *(inodes[n]); } @@ -199,6 +203,7 @@ class MDiscoverReply : public Message { ::_decode(flag_error_dir, payload, off); ::_decode(error_dentry, payload, off); ::_decode(dir_auth_hint, payload, off); + ::_decode(wanted_xlocks_hint, payload, off); // dirs int n; @@ -236,6 +241,7 @@ class MDiscoverReply : public Message { ::_encode(flag_error_dir, payload); ::_encode(error_dentry, payload); ::_encode(dir_auth_hint, payload); + ::_encode(wanted_xlocks_hint, payload); // dirs int n = dirs.size(); diff --git a/branches/sage/cephmds2/messages/MMDSImportMap.h b/branches/sage/cephmds2/messages/MMDSImportMap.h index 49d4a9b35190e..d83d5681ad71e 100644 --- a/branches/sage/cephmds2/messages/MMDSImportMap.h +++ b/branches/sage/cephmds2/messages/MMDSImportMap.h @@ -24,7 +24,7 @@ class MMDSImportMap : public Message { public: map > imap; map > ambiguous_imap; - list master_requests; + list slave_requests; MMDSImportMap() : Message(MSG_MDS_IMPORTMAP) {} @@ -33,7 +33,7 @@ class MMDSImportMap : public Message { void print(ostream& out) { out << "mdsimportmap(" << imap.size() << "+" << ambiguous_imap.size() - << " imports +" << master_requests.size() << " requests)"; + << " imports +" << slave_requests.size() << " slave requests)"; } void add_import(dirfrag_t im) { @@ -47,20 +47,20 @@ class MMDSImportMap : public Message { ambiguous_imap[im] = m; } - void add_master_request(metareqid_t reqid) { - master_requests.push_back(reqid); + void add_slave_request(metareqid_t reqid) { + slave_requests.push_back(reqid); } void encode_payload() { ::_encode(imap, payload); ::_encode(ambiguous_imap, payload); - ::_encode(master_requests, payload); + ::_encode(slave_requests, payload); } void decode_payload() { int off = 0; ::_decode(imap, payload, off); ::_decode(ambiguous_imap, payload, off); - ::_decode(master_requests, payload, off); + ::_decode(slave_requests, payload, off); } }; diff --git a/branches/sage/cephmds2/messages/MMDSResolveAck.h b/branches/sage/cephmds2/messages/MMDSResolveAck.h new file mode 100644 index 0000000000000..1870e226b4161 --- /dev/null +++ b/branches/sage/cephmds2/messages/MMDSResolveAck.h @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MMDSRESOLVEACK_H +#define __MMDSRESOLVEACK_H + +#include "msg/Message.h" + +#include "include/types.h" + + +class MMDSResolveAck : public Message { + public: + list commit; + list abort; + + MMDSResolveAck() : Message(MSG_MDS_RESOLVEACK) {} + + char *get_type_name() { return "resolve_ack"; } + /*void print(ostream& out) { + out << "resolve_ack.size() + << "+" << ambiguous_imap.size() + << " imports +" << slave_requests.size() << " slave requests)"; + } + */ + + void add_commit(metareqid_t r) { + commit.push_back(r); + } + void add_abort(metareqid_t r) { + abort.push_back(r); + } + + void encode_payload() { + ::_encode(commit, payload); + ::_encode(abort, payload); + } + void decode_payload() { + int off = 0; + ::_decode(commit, payload, off); + ::_decode(abort, payload, off); + } +}; + +#endif diff --git a/branches/sage/cephmds2/messages/MMDSSlaveRequest.h b/branches/sage/cephmds2/messages/MMDSSlaveRequest.h index 1e540a4b01018..e2dbbd8f7298a 100644 --- a/branches/sage/cephmds2/messages/MMDSSlaveRequest.h +++ b/branches/sage/cephmds2/messages/MMDSSlaveRequest.h @@ -38,7 +38,11 @@ class MMDSSlaveRequest : public Message { static const int OP_RENAMEGETINODE = 8; static const int OP_RENAMEGETINODEACK = -8; - static const int OP_FINISH = 17; + static const int OP_FINISH = 17; + + static const int OP_ABORT = 20; // used for recovery only + //static const int OP_COMMIT = 21; // used for recovery only + const static char *get_opname(int o) { switch (o) { @@ -58,6 +62,9 @@ class MMDSSlaveRequest : public Message { case OP_RENAMEGETINODEACK: return "rename_get_inode_ack"; case OP_FINISH: return "finish"; // commit + case OP_ABORT: return "abort"; + //case OP_COMMIT: return "commit"; + default: assert(0); return 0; } } @@ -82,6 +89,8 @@ class MMDSSlaveRequest : public Message { version_t inode_export_v; utime_t now; + bufferlist stray; // stray dir + dentry + public: metareqid_t get_reqid() { return reqid; } int get_op() { return op; } @@ -111,6 +120,7 @@ public: ::_encode(now, payload); ::_encode(inode_export, payload); ::_encode(inode_export_v, payload); + ::_encode(stray, payload); } void decode_payload() { int off = 0; @@ -125,6 +135,7 @@ public: ::_decode(now, payload, off); ::_decode(inode_export, payload, off); ::_decode(inode_export_v, payload, off); + ::_decode(stray, payload, off); } char *get_type_name() { return "slave_request"; } diff --git a/branches/sage/cephmds2/msg/Message.cc b/branches/sage/cephmds2/msg/Message.cc index 74b5c8a22b559..716fafb491719 100644 --- a/branches/sage/cephmds2/msg/Message.cc +++ b/branches/sage/cephmds2/msg/Message.cc @@ -53,6 +53,7 @@ using namespace std; #include "messages/MMDSMap.h" #include "messages/MMDSBeacon.h" #include "messages/MMDSImportMap.h" +#include "messages/MMDSResolveAck.h" #include "messages/MMDSCacheRejoin.h" //#include "messages/MMDSCacheRejoinAck.h" @@ -225,6 +226,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_MDS_IMPORTMAP: m = new MMDSImportMap; break; + case MSG_MDS_RESOLVEACK: + m = new MMDSResolveAck; + break; case MSG_MDS_CACHEREJOIN: m = new MMDSCacheRejoin; break; diff --git a/branches/sage/cephmds2/msg/Message.h b/branches/sage/cephmds2/msg/Message.h index e3f1786133e69..6d66e713cb1fe 100644 --- a/branches/sage/cephmds2/msg/Message.h +++ b/branches/sage/cephmds2/msg/Message.h @@ -86,7 +86,9 @@ #define MSG_MDS_BEACON 105 // to monitor #define MSG_MDS_IMPORTMAP 106 -#define MSG_MDS_CACHEREJOIN 107 +#define MSG_MDS_RESOLVEACK 107 + +#define MSG_MDS_CACHEREJOIN 108 #define MSG_MDS_DISCOVER 110 #define MSG_MDS_DISCOVERREPLY 111 diff --git a/branches/sage/cephmds2/script/find_auth_pins.pl b/branches/sage/cephmds2/script/find_auth_pins.pl index c02c12922ed7b..d37fb109a48da 100755 --- a/branches/sage/cephmds2/script/find_auth_pins.pl +++ b/branches/sage/cephmds2/script/find_auth_pins.pl @@ -9,16 +9,19 @@ while (<>) { #cdir:adjust_nested_auth_pins on [dir 163 /foo/ rep@13 | child] count now 0 + 1 if (/adjust_nested_auth_pins/) { - my ($what) = /\[(\w+ \d+) /; + my ($what) = / (\w+)\]/; + $what =~ s/ 0x/ /; $hist{$what} .= "$l: $_" if defined $pin{$what}; } # cinode:auth_pin on inode [1000000002625 /gnu/blah_client_created. 0x89b7700] count now 1 + 0 - if (/auth_pin /) { - my ($what) = /\[(\w+ \d+) /; -# print "add_waiter $c $what\n"; + elsif (/auth_pin / && !/waiting/) { + #my ($what) = /\[(\w+ \w+) /; + my ($what) = / (\w+)\]/; + $what =~ s/ 0x/ /; + #print "$_ add_waiter $c $what\n"; $pin{$what}++; $hist{$what} .= "$l: $_"; push( @pins, $what ) unless grep {$_ eq $what} @pins; @@ -26,8 +29,10 @@ while (<>) { # cinode:auth_unpin on inode [1000000002625 (dangling) 0x89b7700] count now 0 + 0 - if (/auth_unpin/) { - my ($what) = /\[(\w+ \d+) /;# / on (.*\])/; + elsif (/auth_unpin/) { + #my ($what) = /\[(\w+ \w+) /;# / on (.*\])/; + my ($what) = / (\w+)\]/; + $what =~ s/ 0x/ /; $pin{$what}--; $hist{$what} .= "$l: $_"; unless ($pin{$what}) { -- 2.39.5