++p) {
CInode *in = get_inode(p->first);
assert(in);
- mds->server->add_reconnected_cap_inode(in);
+ reconnected_caps.insert(in);
for (map<int, map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
q != p->second.end();
++q)
rejoin_import_cap(in, q->first, r->second, r->first);
}
- mds->server->process_reconnected_caps();
+ process_reconnected_caps();
identify_files_to_recover();
rejoin_send_acks();
// did we already get our acks too?
// this happens when the rejoin_gather has to wait on a MISSING/FULL exchange.
- if (rejoin_ack_gather.empty())
+ if (rejoin_ack_gather.empty()) {
mds->rejoin_done();
+
+ // finally, kickstart past snap parent opens
+ open_snap_parents();
+ }
+}
+
+/*
+ * choose lock states based on reconnected caps
+ */
+void MDCache::process_reconnected_caps()
+{
+ dout(10) << "process_reconnected_caps" << dendl;
+
+ set<CInode*> have_open_snap_parents;
+
+ map<int,MClientSnap*> splits;
+
+ // adjust filelock state appropriately
+ for (set<CInode*>::iterator p = reconnected_caps.begin();
+ p != reconnected_caps.end();
+ ++p) {
+ CInode *in = *p;
+ int issued = in->get_caps_issued();
+ if (in->is_auth()) {
+ // wr?
+ if (issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) {
+ if (issued & (CEPH_CAP_RDCACHE|CEPH_CAP_WRBUFFER)) {
+ in->filelock.set_state(LOCK_LONER);
+ } else {
+ in->filelock.set_state(LOCK_MIXED);
+ }
+ }
+ } else {
+ // note that client should perform stale/reap cleanup during reconnect.
+ assert((issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0); // ????
+ if (in->filelock.is_xlocked())
+ in->filelock.set_state(LOCK_LOCK);
+ else
+ in->filelock.set_state(LOCK_SYNC); // might have been lock, previously
+ }
+ dout(15) << " issued " << cap_string(issued)
+ << " chose " << in->filelock
+ << " on " << *in << dendl;
+
+ SnapRealm *realm = in->find_snaprealm();
+ if (!missing_snap_parents.count(realm->inode) &&
+ !have_open_snap_parents.count(realm->inode)) {
+ if (realm->have_past_parents_open()) {
+ dout(10) << " have past snap parents for realm " << *realm << " on " << *realm->inode << dendl;
+ have_open_snap_parents.insert(realm->inode);
+ } else {
+ dout(10) << " MISSING past snap parents for realm " << *realm << " on " << *realm->inode << dendl;
+ missing_snap_parents.insert(realm->inode);
+ realm->inode->get(CInode::PIN_OPENINGSNAPPARENTS);
+ }
+ }
+
+ // also, make sure each cap is in the correct snaprealm.
+ //SnapRealm *r =
+
+ }
}
void MDCache::rejoin_import_cap(CInode *in, int client, ceph_mds_cap_reconnect& icr, int frommds)
// add cap
Capability *cap = in->reconnect_cap(client, icr);
session->touch_cap(cap);
-
- // send IMPORT
+
+ do_cap_import(session, in, cap);
+}
+
+
+// -------
+// cap imports and delayed snap parent opens
+
+void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap)
+{
+ int client = session->inst.name.num();
SnapRealm *realm = in->find_snaprealm();
- MClientFileCaps *reap = new MClientFileCaps(CEPH_CAP_OP_IMPORT,
- in->inode,
- realm->inode->ino(),
- cap->get_last_seq(),
- cap->pending(),
- cap->wanted(),
- cap->get_mseq());
- realm->build_snap_trace(reap->snapbl);
- mds->messenger->send_message(reap, session->inst);
+ if (realm->have_past_parents_open()) {
+ dout(10) << "do_cap_import " << session->inst.name << " mseq " << cap->get_mseq() << " on " << *in << dendl;
+ MClientFileCaps *reap = new MClientFileCaps(CEPH_CAP_OP_IMPORT,
+ in->inode,
+ realm->inode->ino(),
+ cap->get_last_seq(),
+ cap->pending(),
+ cap->wanted(),
+ cap->get_mseq());
+ realm->build_snap_trace(reap->snapbl);
+ mds->messenger->send_message(reap, session->inst);
+ } else {
+ dout(10) << "do_cap_import missing past snap parents, delaying " << session->inst.name << " mseq "
+ << cap->get_mseq() << " on " << *in << dendl;
+ in->auth_pin(this);
+ cap->inc_suppress();
+ delayed_imported_caps[client].insert(in);
+ missing_snap_parents.insert(in);
+ }
}
+void MDCache::do_delayed_cap_imports()
+{
+ dout(10) << "do_delayed_cap_imports" << dendl;
+
+ map<int,set<CInode*> > d;
+ d.swap(delayed_imported_caps);
+
+ for (map<int,set<CInode*> >::iterator p = d.begin();
+ p != d.end();
+ p++) {
+ for (set<CInode*>::iterator q = p->second.begin();
+ q != p->second.end();
+ q++) {
+ CInode *in = *q;
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first));
+ if (session) {
+ Capability *cap = in->get_client_cap(p->first);
+ if (cap) {
+ do_cap_import(session, in, cap); // note: this may fail and requeue!
+ cap->dec_suppress();
+ }
+ }
+ in->auth_unpin(this);
+ }
+ }
+}
+
+struct C_MDC_OpenSnapParents : public Context {
+ MDCache *mdcache;
+ C_MDC_OpenSnapParents(MDCache *c) : mdcache(c) {}
+ void finish(int r) {
+ mdcache->open_snap_parents();
+ }
+};
+
+void MDCache::open_snap_parents()
+{
+ dout(10) << "open_snap_parents" << dendl;
+
+ C_Gather *gather = new C_Gather;
+
+ set<CInode*>::iterator p = missing_snap_parents.begin();
+ while (p != missing_snap_parents.end()) {
+ CInode *in = *p;
+ assert(in->snaprealm);
+ if (in->snaprealm->open_parents(gather->new_sub())) {
+ dout(10) << " past parents now open on " << *in << dendl;
+ missing_snap_parents.erase(p++);
+
+ in->put(CInode::PIN_OPENINGSNAPPARENTS);
+
+ // finish off client snaprealm reconnects?
+ map<inodeno_t,map<int,snapid_t> >::iterator q = reconnected_snaprealms.find(in->ino());
+ if (q != reconnected_snaprealms.end()) {
+ for (map<int,snapid_t>::iterator r = q->second.begin();
+ r != q->second.end();
+ r++)
+ finish_snaprealm_reconnect(r->first, in->snaprealm, r->second);
+ reconnected_snaprealms.erase(q);
+ }
+ } else {
+ dout(10) << " opening past parents on " << *in << dendl;
+ p++;
+ }
+ }
+
+ if (gather->get_num()) {
+ dout(10) << "open_snap_parents - waiting for " << gather->get_num() << dendl;
+ gather->set_finisher(new C_MDC_OpenSnapParents(this));
+ } else {
+ assert(missing_snap_parents.empty());
+ assert(reconnected_snaprealms.empty());
+ dout(10) << "open_snap_parents - all open" << dendl;
+ do_delayed_cap_imports();
+ }
+}
+
+void MDCache::finish_snaprealm_reconnect(int client, SnapRealm *realm, snapid_t seq)
+{
+ if (seq < realm->get_newest_seq()) {
+ dout(10) << "finish_snaprealm_reconnect client" << client << " has old seq " << seq << " < "
+ << realm->get_newest_seq()
+ << " on " << *realm << dendl;
+ // send an update
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client));
+ if (session) {
+ MClientSnap *snap = new MClientSnap(CEPH_SNAP_OP_UPDATE);
+ realm->build_snap_trace(snap->bl);
+ mds->send_message_client(snap, session->inst);
+ } else {
+ dout(10) << " ...or not, no session for this client!" << dendl;
+ }
+ } else {
+ dout(10) << "finish_snaprealm_reconnect client" << client << " up to date"
+ << " on " << *realm << dendl;
+ }
+}
+
+
+
void MDCache::rejoin_send_acks()
{
dout(7) << "rejoin_send_acks" << dendl;
// make sure snaprealm parents are open...
if (cur->snaprealm && !cur->snaprealm->open && mdr &&
- !cur->snaprealm->open_parents(mdr))
+ !cur->snaprealm->open_parents(new C_MDS_RetryRequest(this, mdr)))
return 1;
} else {
dout(7) << "remote link to " << dn->get_remote_ino() << ", which i don't have" << dendl;
assert(mdr); // we shouldn't hit non-primary dentries doing a non-mdr traversal!
- open_remote_ino(dn->get_remote_ino(), mdr, _get_waiter(mdr, req));
+ open_remote_ino(dn->get_remote_ino(), _get_waiter(mdr, req));
if (mds->logger) mds->logger->inc("trino");
return 1;
}
return in;
} else {
dout(10) << "get_dentry_inode on remote dn, opening inode for " << *dn << dendl;
- open_remote_ino(dn->get_remote_ino(), mdr, new C_MDS_RetryRequest(this, mdr));
+ open_remote_ino(dn->get_remote_ino(), new C_MDS_RetryRequest(this, mdr));
return 0;
}
}
class C_MDC_RetryOpenRemoteIno : public Context {
MDCache *mdcache;
inodeno_t ino;
- MDRequest *mdr;
Context *onfinish;
public:
- C_MDC_RetryOpenRemoteIno(MDCache *mdc, inodeno_t i, MDRequest *r, Context *c) :
- mdcache(mdc), ino(i), mdr(r), onfinish(c) {}
+ C_MDC_RetryOpenRemoteIno(MDCache *mdc, inodeno_t i, Context *c) :
+ mdcache(mdc), ino(i), onfinish(c) {}
void finish(int r) {
- mdcache->open_remote_ino(ino, mdr, onfinish);
+ mdcache->open_remote_ino(ino, onfinish);
}
};
class C_MDC_OpenRemoteIno : public Context {
MDCache *mdcache;
inodeno_t ino;
- MDRequest *mdr;
Context *onfinish;
public:
vector<Anchor> anchortrace;
- C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, MDRequest *r, Context *c) :
- mdcache(mdc), ino(i), mdr(r), onfinish(c) {}
+ C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, Context *c) :
+ mdcache(mdc), ino(i), onfinish(c) {}
C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, vector<Anchor>& at,
- MDRequest *r, Context *c) :
- mdcache(mdc), ino(i), mdr(r), onfinish(c), anchortrace(at) {}
+ Context *c) :
+ mdcache(mdc), ino(i), onfinish(c), anchortrace(at) {}
void finish(int r) {
assert(r == 0);
if (r == 0)
- mdcache->open_remote_ino_2(ino, mdr, anchortrace, onfinish);
+ mdcache->open_remote_ino_2(ino, anchortrace, onfinish);
else {
onfinish->finish(r);
delete onfinish;
}
};
-void MDCache::open_remote_ino(inodeno_t ino,
- MDRequest *mdr,
- Context *onfinish)
+void MDCache::open_remote_ino(inodeno_t ino, Context *onfinish)
{
dout(7) << "open_remote_ino on " << ino << dendl;
- C_MDC_OpenRemoteIno *c = new C_MDC_OpenRemoteIno(this, ino, mdr, onfinish);
+ C_MDC_OpenRemoteIno *c = new C_MDC_OpenRemoteIno(this, ino, onfinish);
mds->anchorclient->lookup(ino, c->anchortrace, c);
}
void MDCache::open_remote_ino_2(inodeno_t ino,
- MDRequest *mdr,
vector<Anchor>& anchortrace,
Context *onfinish)
{
if (!in->dirfragtree.contains(frag)) {
dout(10) << "frag " << frag << " not valid, requerying anchortable" << dendl;
- open_remote_ino(ino, mdr, onfinish);
+ open_remote_ino(ino, onfinish);
return;
}
dout(10) << "opening remote dirfrag " << frag << " under " << *in << dendl;
/* we re-query the anchortable just to avoid a fragtree update race */
open_remote_dirfrag(in, frag,
- new C_MDC_RetryOpenRemoteIno(this, ino, mdr, onfinish));
+ new C_MDC_RetryOpenRemoteIno(this, ino, onfinish));
return;
}
if (!dir && in->is_auth()) {
if (dir->is_frozen_dir()) {
dout(7) << "traverse: " << *dir << " is frozen_dir, waiting" << dendl;
- dir->add_waiter(CDir::WAIT_UNFREEZE, _get_waiter(mdr, 0));
+ dir->add_waiter(CDir::WAIT_UNFREEZE, onfinish);
return;
}
dir = in->get_or_open_dirfrag(this, frag);
<< " in complete dir " << *dir
<< ", requerying anchortable"
<< dendl;
- open_remote_ino(ino, mdr, onfinish);
+ open_remote_ino(ino, onfinish);
} else {
dout(10) << "need ino " << anchortrace[i].ino
<< ", fetching incomplete dir " << *dir
<< dendl;
- dir->fetch(new C_MDC_OpenRemoteIno(this, ino, anchortrace, mdr, onfinish));
+ dir->fetch(new C_MDC_OpenRemoteIno(this, ino, anchortrace, onfinish));
}
} else {
// hmm, discover.
dout(10) << "have remote dirfrag " << *dir << ", discovering "
<< anchortrace[i].ino << dendl;
discover_ino(dir, anchortrace[i].ino,
- new C_MDC_OpenRemoteIno(this, ino, anchortrace, mdr, onfinish));
+ new C_MDC_OpenRemoteIno(this, ino, anchortrace, onfinish));
}
}
assert(dis->get_asker() != whoami);
- /*
if (mds->get_state() < MDSMap::STATE_ACTIVE) {
- dout(-7) << "discover_reply NOT ACTIVE YET" << dendl;
- delete dis;
+ dout(-7) << "discover_reply not yet active, delaying" << dendl;
+ mds->wait_for_active(new C_MDS_RetryMessage(mds, dis));
return;
}
- */
CInode *cur = 0;
new C_MDS_session_finish(mds, session, false, pv));
} else {
+ // snaprealms
+ for (map<inodeno_t, ceph_mds_snaprealm_reconnect>::iterator p = m->realms.begin();
+ p != m->realms.end();
+ p++) {
+ CInode *in = mdcache->get_inode(p->first);
+ if (in) {
+ assert(in->snaprealm);
+ if (in->snaprealm->have_past_parents_open())
+ mdcache->finish_snaprealm_reconnect(from, in->snaprealm, snapid_t(p->second.seq));
+ else
+ mdcache->add_reconnected_snaprealm(from, p->first, snapid_t(p->second.seq));
+ } else {
+ mdcache->add_reconnected_snaprealm(from, p->first, snapid_t(p->second.seq));
+ }
+ }
+
// caps
for (map<inodeno_t, cap_reconnect_t>::iterator p = m->caps.begin();
p != m->caps.end();
dout(15) << "open caps on " << *in << dendl;
Capability *cap = in->reconnect_cap(from, p->second.capinfo);
session->touch_cap(cap);
- reconnected_caps.insert(in);
+ mds->mdcache->add_reconnected_cap(in);
continue;
}
// remove from gather set
client_reconnect_gather.erase(from);
- if (client_reconnect_gather.empty()) reconnect_gather_finish();
+ if (client_reconnect_gather.empty())
+ reconnect_gather_finish();
delete m;
}
-/*
- * called by mdcache, late in rejoin (right before acks are sent)
- */
-void Server::process_reconnected_caps()
-{
- dout(10) << "process_reconnected_caps" << dendl;
-
- // adjust filelock state appropriately
- for (set<CInode*>::iterator p = reconnected_caps.begin();
- p != reconnected_caps.end();
- ++p) {
- CInode *in = *p;
- int issued = in->get_caps_issued();
- if (in->is_auth()) {
- // wr?
- if (issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) {
- if (issued & (CEPH_CAP_RDCACHE|CEPH_CAP_WRBUFFER)) {
- in->filelock.set_state(LOCK_LONER);
- } else {
- in->filelock.set_state(LOCK_MIXED);
- }
- }
- } else {
- // note that client should perform stale/reap cleanup during reconnect.
- assert((issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0); // ????
- if (in->filelock.is_xlocked())
- in->filelock.set_state(LOCK_LOCK);
- else
- in->filelock.set_state(LOCK_SYNC); // might have been lock, previously
- }
- dout(15) << " issued " << cap_string(issued)
- << " chose " << in->filelock
- << " on " << *in << dendl;
- }
- reconnected_caps.clear(); // clean up
-}
void Server::reconnect_gather_finish()
dn->link_remote(in);
} else {
mdcache->open_remote_ino(dn->get_remote_ino(),
- mdr,
new C_MDS_RetryRequest(mdcache, mdr));
// touch everything i _do_ have
if (cur->inode.is_dir()) cmode = CEPH_FILE_MODE_PIN;
// register new cap
- Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session);
+ bool is_new = false;
+ Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session, is_new);
// drop our locks (they may interfere with us issuing new caps)
mdcache->request_drop_locks(mdr);
- cap->set_suppress(false); // stop suppressing messages on this cap
+ if (is_new)
+ cap->dec_suppress(); // stop suppressing messages on new cap
dout(12) << "_do_open issued caps " << cap_string(cap->pending())
<< " for " << req->get_source()