From: Sage Weil Date: Tue, 23 Mar 2010 21:54:13 +0000 (-0700) Subject: mds: fix up client session importing X-Git-Tag: v0.20~151^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=41f6adc9d42855a3ecdbf106e0b5217a6f672bfe;p=ceph.git mds: fix up client session importing Keep import counter for each session, for overlapping imports. Prevent a session close or kill during an import. --- diff --git a/src/TODO b/src/TODO index 5db30e3163d6..96e03b507c01 100644 --- a/src/TODO +++ b/src/TODO @@ -60,7 +60,6 @@ filestore - need an osdmap cache layer? bugs -- mds prepare_force_open_sessions, then import aborts.. session is still OPENING but no client_session is sent... - rm -r failure (on kernel tree) - dbench 1, restart mds (may take a few times), dbench will error out. @@ -369,4 +368,4 @@ radosgw -- for nicer kclient debug output (everything but messenger, but including msg in/out) -echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control \ No newline at end of file +echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index b43468e9a8e4..755c7116c920 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -305,6 +305,7 @@ struct MDRequest : public Mutation { bool was_link_merge; map imported_client_map; + map sseq_map; map > cap_imports; // for snaps diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index f59a7fb64a09..f92de726c8ee 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -1648,12 +1648,13 @@ class C_MDS_ImportDirLoggedStart : public Context { int from; public: map imported_client_map; + map sseqmap; C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) : migrator(m), dir(d), from(f) { } void finish(int r) { - migrator->import_logged_start(dir, from, imported_client_map); + migrator->import_logged_start(dir, from, imported_client_map, sseqmap); } }; @@ -1686,7 +1687,7 @@ void Migrator::handle_export_dir(MExportDir *m) bufferlist::iterator cmp = m->client_map.begin(); ::decode(onlogged->imported_client_map, cmp); assert(cmp.end()); - le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map); + le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap); le->client_map.claim(m->client_map); bufferlist::iterator blp = m->export_data.begin(); @@ -1910,7 +1911,8 @@ void Migrator::import_reverse_final(CDir *dir) void Migrator::import_logged_start(CDir *dir, int from, - map& imported_client_map) + map& imported_client_map, + map& sseqmap) { dout(7) << "import_logged " << *dir << dendl; @@ -1920,7 +1922,7 @@ void Migrator::import_logged_start(CDir *dir, int from, assert (g_conf.mds_kill_import_at != 7); // force open client sessions and finish cap import - mds->server->finish_force_open_sessions(imported_client_map); + mds->server->finish_force_open_sessions(imported_client_map, sseqmap); for (map >::iterator p = import_caps[dir].begin(); p != import_caps[dir].end(); @@ -2307,10 +2309,12 @@ class C_M_LoggedImportCaps : public Context { int from; public: map > cap_imports; + map client_map; + map sseqmap; C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {} void finish(int r) { - migrator->logged_import_caps(in, from, cap_imports); + migrator->logged_import_caps(in, from, cap_imports, client_map, sseqmap); } }; @@ -2326,6 +2330,8 @@ void Migrator::handle_export_caps(MExportCaps *ex) */ C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num()); + finish->client_map.swap(ex->client_map); + ESessions *le = new ESessions(++mds->sessionmap.projected); mds->mdlog->start_entry(le); @@ -2335,8 +2341,8 @@ void Migrator::handle_export_caps(MExportCaps *ex) assert(!finish->cap_imports.empty()); // thus, inode is pinned. // journal open client sessions - mds->server->prepare_force_open_sessions(ex->client_map); - le->client_map.swap(ex->client_map); + mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap); + le->client_map = finish->client_map; mds->mdlog->submit_entry(le); mds->mdlog->wait_for_safe(finish); @@ -2348,9 +2354,15 @@ void Migrator::handle_export_caps(MExportCaps *ex) void Migrator::logged_import_caps(CInode *in, int from, - map >& cap_imports) + map >& cap_imports, + map& client_map, + map& sseqmap) { dout(10) << "logged_import_caps on " << *in << dendl; + + // force open client sessions and finish cap import + mds->server->finish_force_open_sessions(client_map, sseqmap); + assert(cap_imports.count(in)); finish_import_inode_caps(in, from, cap_imports[in]); diff --git a/src/mds/Migrator.h b/src/mds/Migrator.h index 3debf202aa9e..5d6df53bcc2a 100644 --- a/src/mds/Migrator.h +++ b/src/mds/Migrator.h @@ -252,7 +252,8 @@ protected: void import_reverse_final(CDir *dir); void import_notify_abort(CDir *dir, set& bounds); void import_logged_start(CDir *dir, int from, - map &imported_client_map); + map &imported_client_map, + map& sseqmap); void handle_export_finish(MExportDirFinish *m); public: void import_finish(CDir *dir); @@ -261,7 +262,9 @@ protected: void handle_export_caps(MExportCaps *m); void logged_import_caps(CInode *in, int from, - map >& cap_imports); + map >& cap_imports, + map& client_map, + map& sseqmap); friend class C_MDS_ImportDirLoggedStart; diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 2427e2101066..cf33c147da33 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -229,6 +229,10 @@ void Server::handle_client_session(MClientSession *m) dout(10) << "already closed|closing|killing, dropping this req" << dendl; return; } + if (session->is_importing()) { + dout(10) << "ignoring close req on importing session" << dendl; + return; + } assert(session->is_open() || session->is_stale() || session->is_opening()); @@ -323,7 +327,8 @@ void Server::_session_logged(Session *session, __u64 state_seq, bool open, versi mds->sessionmap.version++; // noop } -version_t Server::prepare_force_open_sessions(map& cm) +version_t Server::prepare_force_open_sessions(map& cm, + map& sseqmap) { version_t pv = ++mds->sessionmap.projected; dout(10) << "prepare_force_open_sessions " << pv @@ -334,13 +339,19 @@ version_t Server::prepare_force_open_sessions(map& cm) if (session->is_closed() || session->is_closing() || session->is_killing()) - mds->sessionmap.set_state(session, Session::STATE_OPENING); + sseqmap[p->first] = mds->sessionmap.set_state(session, Session::STATE_OPENING); + else + assert(session->is_open() || + session->is_opening() || + session->is_stale()); + session->inc_importing(); mds->sessionmap.touch_session(session); } return pv; } -void Server::finish_force_open_sessions(map& cm) +void Server::finish_force_open_sessions(map& cm, + map& sseqmap) { /* * FIXME: need to carefully consider the race conditions between a @@ -352,11 +363,21 @@ void Server::finish_force_open_sessions(map& cm) for (map::iterator p = cm.begin(); p != cm.end(); ++p) { Session *session = mds->sessionmap.get_session(p->second.name); assert(session); - if (session->is_opening()) { - dout(10) << "force_open_sessions opening " << session->inst << dendl; - mds->sessionmap.set_state(session, Session::STATE_OPEN); - mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst); + + if (sseqmap.count(p->first)) { + __u64 sseq = sseqmap[p->first]; + if (session->get_state_seq() != sseq) { + dout(10) << "force_open_sessions skipping changed " << session->inst << dendl; + } else { + dout(10) << "force_open_sessions opened " << session->inst << dendl; + mds->sessionmap.set_state(session, Session::STATE_OPEN); + mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst); + } + } else { + dout(10) << "force_open_sessions skipping already-open " << session->inst << dendl; + assert(session->is_open() || session->is_stale()); } + session->dec_importing(); } mds->sessionmap.version++; } @@ -428,13 +449,17 @@ void Server::find_idle_sessions() Session *session = mds->sessionmap.get_oldest_session(Session::STATE_STALE); if (!session) break; + if (session->is_importing()) { + dout(10) << "stopping at importing session " << session->inst << dendl; + break; + } assert(session->is_stale()); if (session->last_cap_renew >= cutoff) { dout(20) << "oldest stale session is " << session->inst << " and sufficiently new (" << session->last_cap_renew << ")" << dendl; break; } - + stringstream ss; utime_t age = now; age -= session->last_cap_renew; @@ -448,9 +473,10 @@ void Server::find_idle_sessions() void Server::kill_session(Session *session) { - if (session->is_opening() || - session->is_open() || - session->is_stale()) { + if ((session->is_opening() || + session->is_open() || + session->is_stale()) && + !session->is_importing()) { dout(10) << "kill_session " << session << dendl; __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_KILLING); version_t pv = ++mds->sessionmap.projected; @@ -458,10 +484,11 @@ void Server::kill_session(Session *session) new C_MDS_session_finish(mds, session, sseq, false, pv)); mdlog->flush(); } else { - dout(10) << "kill_session already closing/killing " << session << dendl; + dout(10) << "kill_session importing or already closing/killing " << session << dendl; assert(session->is_closing() || session->is_closed() || - session->is_killing()); + session->is_killing() || + session->is_importing()); } } @@ -4343,7 +4370,7 @@ version_t Server::_rename_prepare_import(MDRequest *mdr, CDentry *srcdn, bufferl // imported caps ::decode(mdr->more()->imported_client_map, blp); ::encode(mdr->more()->imported_client_map, *client_map_bl); - prepare_force_open_sessions(mdr->more()->imported_client_map); + prepare_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map); list updated_scatterlocks; // we clear_updated explicitly below mdcache->migrator->decode_import_inode(srcdn, blp, @@ -4640,7 +4667,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen assert(mdr->more()->inode_import.length() > 0); // finish cap imports - finish_force_open_sessions(mdr->more()->imported_client_map); + finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map); if (mdr->more()->cap_imports.count(destdnl->get_inode())) mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(), srcdn->authority().first, mdr->more()->cap_imports[destdnl->get_inode()]); diff --git a/src/mds/Server.h b/src/mds/Server.h index d830c6dffe75..bf2774824104 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -71,8 +71,10 @@ public: void handle_client_session(class MClientSession *m); void _session_logged(Session *session, __u64 state_seq, bool open, version_t pv, interval_set& inos,version_t piv); - version_t prepare_force_open_sessions(map &cm); - void finish_force_open_sessions(map &cm); + version_t prepare_force_open_sessions(map &cm, + map& sseqmap); + void finish_force_open_sessions(map &cm, + map& sseqmap); void terminate_sessions(); void find_idle_sessions(); void kill_session(Session *session); diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index e866f1853038..515050316344 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -44,16 +44,15 @@ class Session : public RefCountedObject { public: /* - <-- closed <------------+--------+ - ^ | | | - | v | | - killing <-- opening <---+ | | - ^ | | | | | - | v | | | | - stale <--> open --> closing ---+ | - | ^| | | | - | |v v v | - +--------> importing <----------------+ + <-- closed <------------+ + ^ | | + | v | + killing <-- opening <----+ | + ^ | | | + | v | | + stale <--> open --> closing ---+ + + + additional dimension of 'importing' (with counter) */ static const int STATE_CLOSED = 0; @@ -62,7 +61,6 @@ public: static const int STATE_CLOSING = 3; // journaling close static const int STATE_STALE = 4; static const int STATE_KILLING = 5; - static const int STATE_IMPORTING = 6; const char *get_state_name(int s) { switch (s) { @@ -72,7 +70,6 @@ public: case STATE_CLOSING: return "closing"; case STATE_STALE: return "stale"; case STATE_KILLING: return "killing"; - case STATE_IMPORTING: return "importing"; default: return "???"; } } @@ -80,6 +77,7 @@ public: private: int state; __u64 state_seq; + int importing_count; friend class SessionMap; public: entity_inst_t inst; @@ -128,6 +126,15 @@ public: bool is_stale() { return state == STATE_STALE; } bool is_killing() { return state == STATE_KILLING; } + void inc_importing() { + ++importing_count; + } + void dec_importing() { + assert(importing_count); + --importing_count; + } + bool is_importing() { return importing_count > 0; } + // -- caps -- private: version_t cap_push_seq; // cap push seq # @@ -167,7 +174,7 @@ public: Session() : - state(STATE_CLOSED), state_seq(0), + state(STATE_CLOSED), state_seq(0), importing_count(0), item_session_list(this), requests(0), // member_offset passed to front() manually cap_push_seq(0) { }