}
break;
+ case IMPORT_FINISHING:
+ assert(dir);
+ dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
+ import_finish(dir, true);
+ break;
+
case IMPORT_ABORTING:
assert(dir);
dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
assert (g_conf->mds_kill_export_at != 11);
// no notifies to wait for?
- if (stat.notify_ack_waiting.empty())
+ if (stat.notify_ack_waiting.empty()) {
export_finish(dir); // skip notify/notify_ack stage.
+ } else {
+ // notify peer to send cap import messages to clients
+ if (mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
+ mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), false, stat.tid), stat.peer);
+ } else {
+ dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
+ }
+ }
}
/*
// send finish/commit to new auth
if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
- mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), it->second.tid), it->second.peer);
+ mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), true, it->second.tid), it->second.peer);
} else {
- dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
+ dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
}
assert(g_conf->mds_kill_export_at != 13);
}
}
- // reexport caps
- for (map<CInode*, map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
- p != stat.peer_exports.end();
- ++p) {
- CInode *in = p->first;
- dout(20) << " reexporting caps on " << *in << dendl;
- /*
- * bleh.. just export all caps for this inode. the auth mds
- * will pick them up during recovery.
- */
- bufferlist bl; // throw this away
- map<client_t,entity_inst_t> exported_client_map; // throw this away too
- encode_export_inode_caps(in, bl, exported_client_map);
- finish_export_inode_caps(in);
+ if (stat.state == IMPORT_ACKING) {
+ // remove imported caps
+ for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
+ p != stat.peer_exports.end();
+ ++p) {
+ CInode *in = p->first;
+ for (map<client_t,Capability::Export>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ Capability *cap = in->get_client_cap(q->first);
+ if (cap->is_new())
+ in->remove_client_cap(q->first);
+ }
+ }
+ for (map<client_t,entity_inst_t>::iterator p = stat.client_map.begin();
+ p != stat.client_map.end();
+ ++p) {
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
+ session->dec_importing();
+ }
}
// log our failure
assert (g_conf->mds_kill_import_at != 7);
// force open client sessions and finish cap import
- mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
+ mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false);
+ it->second.client_map.swap(imported_client_map);
map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
-
for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
p != it->second.peer_exports.end();
++p) {
-
- finish_import_inode_caps(p->first, from, true, p->second, imported_caps[p->first->ino()]);
+ // parameter 'peer' is -1, delay sending cap import messages to client
+ finish_import_inode_caps(p->first, -1, true, p->second, imported_caps[p->first->ino()]);
}
// send notify's etc.
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
assert(dir);
- dout(7) << "handle_export_finish on " << *dir << dendl;
+ dout(7) << "handle_export_finish on " << *dir << (m->is_last() ? " last" : "") << dendl;
map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
assert(it != import_state.end());
- assert(it->second.state == IMPORT_ACKING);
assert(it->second.tid == m->get_tid());
- import_finish(dir, false);
+ import_finish(dir, false, m->is_last());
+
m->put();
}
-void Migrator::import_finish(CDir *dir, bool notify)
+void Migrator::import_finish(CDir *dir, bool notify, bool last)
{
dout(7) << "import_finish on " << *dir << dendl;
+ map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
+ assert(it != import_state.end());
+ assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
+
// log finish
assert(g_conf->mds_kill_import_at != 9);
+ if (it->second.state == IMPORT_ACKING) {
+ for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
+ p != it->second.peer_exports.end();
+ ++p) {
+ CInode *in = p->first;
+ assert(in->is_auth());
+ for (map<client_t,Capability::Export>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+ assert(session);
+ Capability *cap = in->get_client_cap(q->first);
+ assert(cap);
+ cap->clear_new();
+ cap->merge(q->second, true);
+ mds->mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
+ q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
+ }
+ p->second.clear();
+ }
+ for (map<client_t,entity_inst_t>::iterator p = it->second.client_map.begin();
+ p != it->second.client_map.end();
+ ++p) {
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
+ session->dec_importing();
+ }
+ }
+
+ if (!last) {
+ assert(it->second.state == IMPORT_ACKING);
+ it->second.state = IMPORT_FINISHING;
+ return;
+ }
+
// clear updated scatterlocks
/*
for (list<ScatterLock*>::iterator p = import_updated_scatterlocks[dir].begin();
import_remove_pins(dir, bounds);
map<CInode*, map<client_t,Capability::Export> > peer_exports;
- import_state[dir->dirfrag()].peer_exports.swap(peer_exports);
+ it->second.peer_exports.swap(peer_exports);
// clear import state (we're done!)
- import_state.erase(dir->dirfrag());
+ import_state.erase(it);
mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
Capability *cap = in->get_client_cap(it->first);
if (!cap) {
cap = in->add_client_cap(it->first, session);
+ if (peer < 0)
+ cap->mark_new();
}
Capability::Import& im = import_map[it->first];
im.mseq = auth_cap ? it->second.mseq : cap->get_mseq();
im.issue_seq = cap->get_last_seq() + 1;
- cap->merge(it->second, auth_cap);
- mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
- it->second.seq, it->second.mseq - 1, peer,
- auth_cap ? CEPH_CAP_FLAG_AUTH : 0);
+ if (peer >= 0) {
+ cap->merge(it->second, auth_cap);
+ mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
+ it->second.seq, it->second.mseq - 1, peer,
+ auth_cap ? CEPH_CAP_FLAG_AUTH : 0);
+ }
}
in->replica_caps_wanted = 0;
const static int IMPORT_PREPPED = 4; // opened bounds, waiting for import
const static int IMPORT_LOGGINGSTART = 5; // got import, logging EImportStart
const static int IMPORT_ACKING = 6; // logged EImportStart, sent ack, waiting for finish
- const static int IMPORT_ABORTING = 7; // notifying bystanders of an abort before unfreezing
+ const static int IMPORT_FINISHING = 7; // sent cap imports, waiting for finish
+ const static int IMPORT_ABORTING = 8; // notifying bystanders of an abort before unfreezing
static const char *get_import_statename(int s) {
switch (s) {
case IMPORT_DISCOVERING: return "discovering";
case IMPORT_PREPPED: return "prepped";
case IMPORT_LOGGINGSTART: return "loggingstart";
case IMPORT_ACKING: return "acking";
+ case IMPORT_FINISHING: return "finishing";
case IMPORT_ABORTING: return "aborting";
default: assert(0); return 0;
}
set<int> bystanders;
list<dirfrag_t> bound_ls;
list<ScatterLock*> updated_scatterlocks;
+ map<client_t,entity_inst_t> client_map;
map<CInode*, map<client_t,Capability::Export> > peer_exports;
};
map<client_t,uint64_t>& sseqmap);
void handle_export_finish(MExportDirFinish *m);
public:
- void import_finish(CDir *dir, bool notify);
+ void import_finish(CDir *dir, bool notify, bool last=true);
protected:
void handle_export_caps(MExportCaps *m);