From: Yan, Zheng Date: Tue, 26 Nov 2013 09:19:04 +0000 (+0800) Subject: mds: send cap import messages to clients after importing subtree succeeds X-Git-Tag: v0.75~93^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9dc52ff04bebd8f2477f88514d653484bd2b150f;p=ceph.git mds: send cap import messages to clients after importing subtree succeeds When importing subtree, the importer sends cap import messages to clients before the import subtree operation is considered as success. If the exporter crashes before EExport event is journalled, the importer needs to re-export client caps. This confuses clients, and makes them lose track of auth caps. Signed-off-by: Yan, Zheng --- diff --git a/src/mds/Capability.h b/src/mds/Capability.h index c04a05285cf3..6e35ba815da9 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -217,7 +217,10 @@ private: ceph_seq_t mseq; int suppress; - bool stale; + unsigned state; + + const static unsigned STATE_STALE = (1<<0); + const static unsigned STATE_NEW = (1<<1); public: snapid_t client_follows; @@ -234,7 +237,7 @@ public: last_sent(0), last_issue(0), mseq(0), - suppress(0), stale(false), + suppress(0), state(0), client_follows(0), client_xattr_version(0), item_session_caps(this), item_snaprealm_caps(this) { g_num_cap++; @@ -263,8 +266,12 @@ public: void inc_suppress() { suppress++; } void dec_suppress() { suppress--; } - bool is_stale() { return stale; } - void set_stale(bool b) { stale = b; } + bool is_stale() { return state & STATE_STALE; } + void mark_stale() { state |= STATE_STALE; } + void clear_stale() { state &= ~STATE_STALE; } + bool is_new() { return state & STATE_NEW; } + void mark_new() { state |= STATE_NEW; } + void clear_new() { state &= ~STATE_NEW; } CInode *get_inode() { return inode; } client_t get_client() { return client; } diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 7365a47bf1f3..eb3005593324 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -1852,7 +1852,7 @@ void Locker::revoke_stale_caps(Session *session) for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { Capability *cap = *p; - cap->set_stale(true); + cap->mark_stale(); CInode *in = cap->get_inode(); int issued = cap->issued(); if (issued) { @@ -1889,7 +1889,7 @@ void Locker::resume_stale_caps(Session *session) assert(in->is_head()); if (cap->is_stale()) { dout(10) << " clearing stale flag on " << *in << dendl; - cap->set_stale(false); + cap->clear_stale(); if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS)) issue_caps(in, cap); } diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 30b037cc505e..d2e97d234e63 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -478,6 +478,12 @@ void Migrator::handle_mds_failure_or_stop(int who) } break; + case IMPORT_FINISHING: + assert(dir); + dout(10) << "import state=finishing : finishing import on " << *dir << dendl; + import_finish(dir, true); + break; + case IMPORT_ABORTING: assert(dir); dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl; @@ -1582,8 +1588,16 @@ void Migrator::export_logged_finish(CDir *dir) assert (g_conf->mds_kill_export_at != 11); // no notifies to wait for? - if (stat.notify_ack_waiting.empty()) + if (stat.notify_ack_waiting.empty()) { export_finish(dir); // skip notify/notify_ack stage. + } else { + // notify peer to send cap import messages to clients + if (mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) { + mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), false, stat.tid), stat.peer); + } else { + dout(7) << "not sending MExportDirFinish, dest has failed" << dendl; + } + } } /* @@ -1662,9 +1676,9 @@ void Migrator::export_finish(CDir *dir) // send finish/commit to new auth if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) { - mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), it->second.tid), it->second.peer); + mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), true, it->second.tid), it->second.peer); } else { - dout(7) << "not sending MExportDirFinish, dest has failed" << dendl; + dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl; } assert(g_conf->mds_kill_export_at != 13); @@ -2264,20 +2278,26 @@ void Migrator::import_reverse(CDir *dir) } } - // reexport caps - for (map >::iterator p = stat.peer_exports.begin(); - p != stat.peer_exports.end(); - ++p) { - CInode *in = p->first; - dout(20) << " reexporting caps on " << *in << dendl; - /* - * bleh.. just export all caps for this inode. the auth mds - * will pick them up during recovery. - */ - bufferlist bl; // throw this away - map exported_client_map; // throw this away too - encode_export_inode_caps(in, bl, exported_client_map); - finish_export_inode_caps(in); + if (stat.state == IMPORT_ACKING) { + // remove imported caps + for (map >::iterator p = stat.peer_exports.begin(); + p != stat.peer_exports.end(); + ++p) { + CInode *in = p->first; + for (map::iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + Capability *cap = in->get_client_cap(q->first); + if (cap->is_new()) + in->remove_client_cap(q->first); + } + } + for (map::iterator p = stat.client_map.begin(); + p != stat.client_map.end(); + ++p) { + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v)); + session->dec_importing(); + } } // log our failure @@ -2381,15 +2401,15 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, int from, assert (g_conf->mds_kill_import_at != 7); // force open client sessions and finish cap import - mds->server->finish_force_open_sessions(imported_client_map, sseqmap); + mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false); + it->second.client_map.swap(imported_client_map); map > imported_caps; - for (map >::iterator p = it->second.peer_exports.begin(); p != it->second.peer_exports.end(); ++p) { - - finish_import_inode_caps(p->first, from, true, p->second, imported_caps[p->first->ino()]); + // parameter 'peer' is -1, delay sending cap import messages to client + finish_import_inode_caps(p->first, -1, true, p->second, imported_caps[p->first->ino()]); } // send notify's etc. @@ -2412,24 +2432,62 @@ void Migrator::handle_export_finish(MExportDirFinish *m) { CDir *dir = cache->get_dirfrag(m->get_dirfrag()); assert(dir); - dout(7) << "handle_export_finish on " << *dir << dendl; + dout(7) << "handle_export_finish on " << *dir << (m->is_last() ? " last" : "") << dendl; map::iterator it = import_state.find(m->get_dirfrag()); assert(it != import_state.end()); - assert(it->second.state == IMPORT_ACKING); assert(it->second.tid == m->get_tid()); - import_finish(dir, false); + import_finish(dir, false, m->is_last()); + m->put(); } -void Migrator::import_finish(CDir *dir, bool notify) +void Migrator::import_finish(CDir *dir, bool notify, bool last) { dout(7) << "import_finish on " << *dir << dendl; + map::iterator it = import_state.find(dir->dirfrag()); + assert(it != import_state.end()); + assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING); + // log finish assert(g_conf->mds_kill_import_at != 9); + if (it->second.state == IMPORT_ACKING) { + for (map >::iterator p = it->second.peer_exports.begin(); + p != it->second.peer_exports.end(); + ++p) { + CInode *in = p->first; + assert(in->is_auth()); + for (map::iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v)); + assert(session); + Capability *cap = in->get_client_cap(q->first); + assert(cap); + cap->clear_new(); + cap->merge(q->second, true); + mds->mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq, + q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH); + } + p->second.clear(); + } + for (map::iterator p = it->second.client_map.begin(); + p != it->second.client_map.end(); + ++p) { + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v)); + session->dec_importing(); + } + } + + if (!last) { + assert(it->second.state == IMPORT_ACKING); + it->second.state = IMPORT_FINISHING; + return; + } + // clear updated scatterlocks /* for (list::iterator p = import_updated_scatterlocks[dir].begin(); @@ -2448,10 +2506,10 @@ void Migrator::import_finish(CDir *dir, bool notify) import_remove_pins(dir, bounds); map > peer_exports; - import_state[dir->dirfrag()].peer_exports.swap(peer_exports); + it->second.peer_exports.swap(peer_exports); // clear import state (we're done!) - import_state.erase(dir->dirfrag()); + import_state.erase(it); mds->mdlog->start_submit_entry(new EImportFinish(dir, true)); @@ -2582,6 +2640,8 @@ void Migrator::finish_import_inode_caps(CInode *in, int peer, bool auth_cap, Capability *cap = in->get_client_cap(it->first); if (!cap) { cap = in->add_client_cap(it->first, session); + if (peer < 0) + cap->mark_new(); } Capability::Import& im = import_map[it->first]; @@ -2589,10 +2649,12 @@ void Migrator::finish_import_inode_caps(CInode *in, int peer, bool auth_cap, im.mseq = auth_cap ? it->second.mseq : cap->get_mseq(); im.issue_seq = cap->get_last_seq() + 1; - cap->merge(it->second, auth_cap); - mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id, - it->second.seq, it->second.mseq - 1, peer, - auth_cap ? CEPH_CAP_FLAG_AUTH : 0); + if (peer >= 0) { + cap->merge(it->second, auth_cap); + mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id, + it->second.seq, it->second.mseq - 1, peer, + auth_cap ? CEPH_CAP_FLAG_AUTH : 0); + } } in->replica_caps_wanted = 0; diff --git a/src/mds/Migrator.h b/src/mds/Migrator.h index 58dce1cb67a2..9007a85019df 100644 --- a/src/mds/Migrator.h +++ b/src/mds/Migrator.h @@ -113,7 +113,8 @@ public: const static int IMPORT_PREPPED = 4; // opened bounds, waiting for import const static int IMPORT_LOGGINGSTART = 5; // got import, logging EImportStart const static int IMPORT_ACKING = 6; // logged EImportStart, sent ack, waiting for finish - const static int IMPORT_ABORTING = 7; // notifying bystanders of an abort before unfreezing + const static int IMPORT_FINISHING = 7; // sent cap imports, waiting for finish + const static int IMPORT_ABORTING = 8; // notifying bystanders of an abort before unfreezing static const char *get_import_statename(int s) { switch (s) { case IMPORT_DISCOVERING: return "discovering"; @@ -122,6 +123,7 @@ public: case IMPORT_PREPPED: return "prepped"; case IMPORT_LOGGINGSTART: return "loggingstart"; case IMPORT_ACKING: return "acking"; + case IMPORT_FINISHING: return "finishing"; case IMPORT_ABORTING: return "aborting"; default: assert(0); return 0; } @@ -135,6 +137,7 @@ protected: set bystanders; list bound_ls; list updated_scatterlocks; + map client_map; map > peer_exports; }; @@ -332,7 +335,7 @@ protected: map& sseqmap); void handle_export_finish(MExportDirFinish *m); public: - void import_finish(CDir *dir, bool notify); + void import_finish(CDir *dir, bool notify, bool last=true); protected: void handle_export_caps(MExportCaps *m); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 8242d226ff57..a34b082e033f 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -372,7 +372,8 @@ version_t Server::prepare_force_open_sessions(map& cm, } void Server::finish_force_open_sessions(map& cm, - map& sseqmap) + map& sseqmap, + bool dec_import) { /* * FIXME: need to carefully consider the race conditions between a @@ -403,7 +404,8 @@ void Server::finish_force_open_sessions(map& cm, dout(10) << "force_open_sessions skipping already-open " << session->info.inst << dendl; assert(session->is_open() || session->is_stale()); } - session->dec_importing(); + if (dec_import) + session->dec_importing(); } mds->sessionmap.version++; } diff --git a/src/mds/Server.h b/src/mds/Server.h index 091d3d20e293..d7253f1f9da6 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -78,7 +78,8 @@ public: version_t prepare_force_open_sessions(map &cm, map& sseqmap); void finish_force_open_sessions(map &cm, - map& sseqmap); + map& sseqmap, + bool dec_import=true); void flush_client_sessions(set& client_set, C_GatherBuilder& gather); void finish_flush_session(Session *session, version_t seq); void terminate_sessions(); diff --git a/src/messages/MExportDirFinish.h b/src/messages/MExportDirFinish.h index 66d6f27eae26..dd78dda538c8 100644 --- a/src/messages/MExportDirFinish.h +++ b/src/messages/MExportDirFinish.h @@ -19,13 +19,15 @@ class MExportDirFinish : public Message { dirfrag_t dirfrag; + bool last; public: dirfrag_t get_dirfrag() { return dirfrag; } + bool is_last() { return last; } MExportDirFinish() {} - MExportDirFinish(dirfrag_t df, uint64_t tid) : - Message(MSG_MDS_EXPORTDIRFINISH), dirfrag(df) { + MExportDirFinish(dirfrag_t df, bool l, uint64_t tid) : + Message(MSG_MDS_EXPORTDIRFINISH), dirfrag(df), last(l) { set_tid(tid); } private: @@ -34,15 +36,17 @@ private: public: const char *get_type_name() const { return "ExFin"; } void print(ostream& o) const { - o << "export_finish(" << dirfrag << ")"; + o << "export_finish(" << dirfrag << (last ? " last" : "") << ")"; } void encode_payload(uint64_t features) { ::encode(dirfrag, payload); + ::encode(last, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(dirfrag, p); + ::decode(last, p); } };