Keep import counter for each session, for overlapping imports.
Prevent a session close or kill during an import.
- need an osdmap cache layer?
bugs
-- mds prepare_force_open_sessions, then import aborts.. session is still OPENING but no client_session is sent...
- rm -r failure (on kernel tree)
- dbench 1, restart mds (may take a few times), dbench will error out.
-- for nicer kclient debug output (everything but messenger, but including msg in/out)
-echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control
\ No newline at end of file
+echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control
bool was_link_merge;
map<client_t,entity_inst_t> imported_client_map;
+ map<client_t,__u64> sseq_map;
map<CInode*, map<client_t,Capability::Export> > cap_imports;
// for snaps
int from;
public:
map<client_t,entity_inst_t> imported_client_map;
+ map<client_t,__u64> sseqmap;
C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) :
migrator(m), dir(d), from(f) {
}
void finish(int r) {
- migrator->import_logged_start(dir, from, imported_client_map);
+ migrator->import_logged_start(dir, from, imported_client_map, sseqmap);
}
};
bufferlist::iterator cmp = m->client_map.begin();
::decode(onlogged->imported_client_map, cmp);
assert(cmp.end());
- le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map);
+ le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap);
le->client_map.claim(m->client_map);
bufferlist::iterator blp = m->export_data.begin();
void Migrator::import_logged_start(CDir *dir, int from,
- map<client_t,entity_inst_t>& imported_client_map)
+ map<client_t,entity_inst_t>& imported_client_map,
+ map<client_t,__u64>& sseqmap)
{
dout(7) << "import_logged " << *dir << dendl;
assert (g_conf.mds_kill_import_at != 7);
// force open client sessions and finish cap import
- mds->server->finish_force_open_sessions(imported_client_map);
+ mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
for (map<CInode*, map<client_t,Capability::Export> >::iterator p = import_caps[dir].begin();
p != import_caps[dir].end();
int from;
public:
map<CInode*, map<client_t,Capability::Export> > cap_imports;
+ map<client_t,entity_inst_t> client_map;
+ map<client_t,__u64> sseqmap;
C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {}
void finish(int r) {
- migrator->logged_import_caps(in, from, cap_imports);
+ migrator->logged_import_caps(in, from, cap_imports, client_map, sseqmap);
}
};
*/
C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num());
+ finish->client_map.swap(ex->client_map);
+
ESessions *le = new ESessions(++mds->sessionmap.projected);
mds->mdlog->start_entry(le);
assert(!finish->cap_imports.empty()); // thus, inode is pinned.
// journal open client sessions
- mds->server->prepare_force_open_sessions(ex->client_map);
- le->client_map.swap(ex->client_map);
+ mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap);
+ le->client_map = finish->client_map;
mds->mdlog->submit_entry(le);
mds->mdlog->wait_for_safe(finish);
void Migrator::logged_import_caps(CInode *in,
int from,
- map<CInode*, map<client_t,Capability::Export> >& cap_imports)
+ map<CInode*, map<client_t,Capability::Export> >& cap_imports,
+ map<client_t,entity_inst_t>& client_map,
+ map<client_t,__u64>& sseqmap)
{
dout(10) << "logged_import_caps on " << *in << dendl;
+
+ // force open client sessions and finish cap import
+ mds->server->finish_force_open_sessions(client_map, sseqmap);
+
assert(cap_imports.count(in));
finish_import_inode_caps(in, from, cap_imports[in]);
void import_reverse_final(CDir *dir);
void import_notify_abort(CDir *dir, set<CDir*>& bounds);
void import_logged_start(CDir *dir, int from,
- map<client_t,entity_inst_t> &imported_client_map);
+ map<client_t,entity_inst_t> &imported_client_map,
+ map<client_t,__u64>& sseqmap);
void handle_export_finish(MExportDirFinish *m);
public:
void import_finish(CDir *dir);
void handle_export_caps(MExportCaps *m);
void logged_import_caps(CInode *in,
int from,
- map<CInode*, map<client_t,Capability::Export> >& cap_imports);
+ map<CInode*, map<client_t,Capability::Export> >& cap_imports,
+ map<client_t,entity_inst_t>& client_map,
+ map<client_t,__u64>& sseqmap);
friend class C_MDS_ImportDirLoggedStart;
dout(10) << "already closed|closing|killing, dropping this req" << dendl;
return;
}
+ if (session->is_importing()) {
+ dout(10) << "ignoring close req on importing session" << dendl;
+ return;
+ }
assert(session->is_open() ||
session->is_stale() ||
session->is_opening());
mds->sessionmap.version++; // noop
}
-version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm)
+version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
+ map<client_t,__u64>& sseqmap)
{
version_t pv = ++mds->sessionmap.projected;
dout(10) << "prepare_force_open_sessions " << pv
if (session->is_closed() ||
session->is_closing() ||
session->is_killing())
- mds->sessionmap.set_state(session, Session::STATE_OPENING);
+ sseqmap[p->first] = mds->sessionmap.set_state(session, Session::STATE_OPENING);
+ else
+ assert(session->is_open() ||
+ session->is_opening() ||
+ session->is_stale());
+ session->inc_importing();
mds->sessionmap.touch_session(session);
}
return pv;
}
-void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm)
+void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
+ map<client_t,__u64>& sseqmap)
{
/*
* FIXME: need to carefully consider the race conditions between a
for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
Session *session = mds->sessionmap.get_session(p->second.name);
assert(session);
- if (session->is_opening()) {
- dout(10) << "force_open_sessions opening " << session->inst << dendl;
- mds->sessionmap.set_state(session, Session::STATE_OPEN);
- mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+
+ if (sseqmap.count(p->first)) {
+ __u64 sseq = sseqmap[p->first];
+ if (session->get_state_seq() != sseq) {
+ dout(10) << "force_open_sessions skipping changed " << session->inst << dendl;
+ } else {
+ dout(10) << "force_open_sessions opened " << session->inst << dendl;
+ mds->sessionmap.set_state(session, Session::STATE_OPEN);
+ mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+ }
+ } else {
+ dout(10) << "force_open_sessions skipping already-open " << session->inst << dendl;
+ assert(session->is_open() || session->is_stale());
}
+ session->dec_importing();
}
mds->sessionmap.version++;
}
Session *session = mds->sessionmap.get_oldest_session(Session::STATE_STALE);
if (!session)
break;
+ if (session->is_importing()) {
+ dout(10) << "stopping at importing session " << session->inst << dendl;
+ break;
+ }
assert(session->is_stale());
if (session->last_cap_renew >= cutoff) {
dout(20) << "oldest stale session is " << session->inst << " and sufficiently new ("
<< session->last_cap_renew << ")" << dendl;
break;
}
-
+
stringstream ss;
utime_t age = now;
age -= session->last_cap_renew;
void Server::kill_session(Session *session)
{
- if (session->is_opening() ||
- session->is_open() ||
- session->is_stale()) {
+ if ((session->is_opening() ||
+ session->is_open() ||
+ session->is_stale()) &&
+ !session->is_importing()) {
dout(10) << "kill_session " << session << dendl;
__u64 sseq = mds->sessionmap.set_state(session, Session::STATE_KILLING);
version_t pv = ++mds->sessionmap.projected;
new C_MDS_session_finish(mds, session, sseq, false, pv));
mdlog->flush();
} else {
- dout(10) << "kill_session already closing/killing " << session << dendl;
+ dout(10) << "kill_session importing or already closing/killing " << session << dendl;
assert(session->is_closing() ||
session->is_closed() ||
- session->is_killing());
+ session->is_killing() ||
+ session->is_importing());
}
}
// imported caps
::decode(mdr->more()->imported_client_map, blp);
::encode(mdr->more()->imported_client_map, *client_map_bl);
- prepare_force_open_sessions(mdr->more()->imported_client_map);
+ prepare_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
list<ScatterLock*> updated_scatterlocks; // we clear_updated explicitly below
mdcache->migrator->decode_import_inode(srcdn, blp,
assert(mdr->more()->inode_import.length() > 0);
// finish cap imports
- finish_force_open_sessions(mdr->more()->imported_client_map);
+ finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
if (mdr->more()->cap_imports.count(destdnl->get_inode()))
mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(), srcdn->authority().first,
mdr->more()->cap_imports[destdnl->get_inode()]);
void handle_client_session(class MClientSession *m);
void _session_logged(Session *session, __u64 state_seq,
bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
- version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm);
- void finish_force_open_sessions(map<client_t,entity_inst_t> &cm);
+ version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm,
+ map<client_t,__u64>& sseqmap);
+ void finish_force_open_sessions(map<client_t,entity_inst_t> &cm,
+ map<client_t,__u64>& sseqmap);
void terminate_sessions();
void find_idle_sessions();
void kill_session(Session *session);
public:
/*
- <deleted> <-- closed <------------+--------+
- ^ | | |
- | v | |
- killing <-- opening <---+ | |
- ^ | | | | |
- | v | | | |
- stale <--> open --> closing ---+ |
- | ^| | | |
- | |v v v |
- +--------> importing <----------------+
+ <deleted> <-- closed <------------+
+ ^ | |
+ | v |
+ killing <-- opening <----+ |
+ ^ | | |
+ | v | |
+ stale <--> open --> closing ---+
+
+ + additional dimension of 'importing' (with counter)
*/
static const int STATE_CLOSED = 0;
static const int STATE_CLOSING = 3; // journaling close
static const int STATE_STALE = 4;
static const int STATE_KILLING = 5;
- static const int STATE_IMPORTING = 6;
const char *get_state_name(int s) {
switch (s) {
case STATE_CLOSING: return "closing";
case STATE_STALE: return "stale";
case STATE_KILLING: return "killing";
- case STATE_IMPORTING: return "importing";
default: return "???";
}
}
private:
int state;
__u64 state_seq;
+ int importing_count;
friend class SessionMap;
public:
entity_inst_t inst;
bool is_stale() { return state == STATE_STALE; }
bool is_killing() { return state == STATE_KILLING; }
+ void inc_importing() {
+ ++importing_count;
+ }
+ void dec_importing() {
+ assert(importing_count);
+ --importing_count;
+ }
+ bool is_importing() { return importing_count > 0; }
+
// -- caps --
private:
version_t cap_push_seq; // cap push seq #
Session() :
- state(STATE_CLOSED), state_seq(0),
+ state(STATE_CLOSED), state_seq(0), importing_count(0),
item_session_list(this),
requests(0), // member_offset passed to front() manually
cap_push_seq(0) { }