dout(10) << "i am a surivivor, and will ack immediately" << dendl;
ack = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
+ map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
+
// check cap exports
for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = weak->cap_exports.begin();
p != weak->cap_exports.end();
q != p->second.end();
++q) {
dout(10) << " claiming cap import " << p->first << " client." << q->first << " on " << *in << dendl;
- rejoin_import_cap(in, q->first, q->second, from);
+ Capability *cap = rejoin_import_cap(in, q->first, q->second, from);
+ Capability::Import& im = imported_caps[p->first][q->first];
+ im.cap_id = cap->get_cap_id();
+ im.issue_seq = cap->get_last_seq();
+ im.mseq = cap->get_mseq();
}
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
}
+
+ ::encode(imported_caps, ack->imported_caps);
} else {
assert(mds->is_rejoin());
// belongs to, were trimmed between sending cache rejoin and receiving rejoin ack.
assert(isolated_inodes.empty());
+ map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
+ bufferlist::iterator bp = ack->imported_caps.begin();
+ ::decode(peer_imported, bp);
+
+ for (map<inodeno_t,map<client_t,Capability::Import> >::iterator p = peer_imported.begin();
+ p != peer_imported.end();
+ ++p) {
+ assert(cap_exports.count(p->first));
+ assert(cap_export_targets.count(p->first));
+ assert(cap_export_targets[p->first] == from);
+ for (map<client_t,Capability::Import>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ assert(cap_exports[p->first].count(q->first));
+
+ dout(10) << " exporting caps for client." << q->first << " ino " << p->first << dendl;
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+ assert(session);
+
+ // mark client caps stale.
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0,
+ cap_exports[p->first][q->first].cap_id, 0);
+ mds->send_message_client_counted(m, session);
+
+ cap_exports[p->first].erase(q->first);
+ }
+ assert(cap_exports[p->first].empty());
+ }
+
// done?
assert(rejoin_ack_gather.count(from));
rejoin_ack_gather.erase(from);
}
assert(in->is_auth());
for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
- q != p->second.end();
- ++q)
+ q != p->second.end();
+ ++q) {
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+ assert(session);
for (map<int,ceph_mds_cap_reconnect>::iterator r = q->second.begin();
- r != q->second.end();
- ++r) {
- dout(20) << " add_reconnected_cap " << in->ino() << " client." << q->first << dendl;
+ r != q->second.end();
+ ++r) {
add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm));
- rejoin_import_cap(in, q->first, r->second, r->first);
+ Capability *cap = in->reconnect_cap(q->first, r->second, session);
+ if (r->first >= 0) {
+ do_cap_import(session, in, cap);
+
+ Capability::Import& im = rejoin_imported_caps[r->first][p->first][q->first];
+ im.cap_id = cap->get_cap_id();
+ im.issue_seq = cap->get_last_seq();
+ im.mseq = cap->get_mseq();
+ }
}
+ }
cap_imports.erase(p++); // remove and move on
}
} else {
- for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator q = cap_exports.begin();
- q != cap_exports.end();
- ++q) {
- for (map<client_t,ceph_mds_cap_reconnect>::iterator r = q->second.begin();
- r != q->second.end();
- ++r) {
- dout(10) << " exporting caps for client." << r->first << " ino " << q->first << dendl;
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(r->first.v));
- assert(session);
- // mark client caps stale.
- MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, q->first, 0, 0, 0);
- mds->send_message_client_counted(m, session);
- }
- }
-
trim_non_auth();
rejoin_gather.erase(mds->get_nodeid());
-void MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds)
+Capability* MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds)
{
dout(10) << "rejoin_import_cap for client." << client << " from mds." << frommds
<< " on " << *in << dendl;
cap->rejoin_import();
do_cap_import(session, in, cap);
}
+
+ return cap;
}
void MDCache::export_remaining_imported_caps()
// send acks
for (map<int,MMDSCacheRejoin*>::iterator p = ack.begin();
p != ack.end();
- ++p)
+ ++p) {
+ ::encode(rejoin_imported_caps[p->first], p->second->imported_caps);
mds->send_message_mds(p->second, p->first);
-
+ }
+
+ rejoin_imported_caps.clear();
}
set<int> rejoin_gather; // nodes from whom i need a rejoin
set<int> rejoin_sent; // nodes i sent a rejoin to
set<int> rejoin_ack_gather; // nodes from whom i need a rejoin ack
+ map<int,map<inodeno_t,map<client_t,Capability::Import> > > rejoin_imported_caps;
map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> > cap_exports; // ino -> client -> capex
map<inodeno_t,int> cap_export_targets; // ino -> auth mds
map<client_t,MClientSnap*>& splits);
void do_realm_invalidate_and_update_notify(CInode *in, int snapop, bool nosend=false);
void send_snaps(map<client_t,MClientSnap*>& splits);
- void rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds);
+ Capability* rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds);
void finish_snaprealm_reconnect(client_t client, SnapRealm *realm, snapid_t seq);
void try_reconnect_cap(CInode *in, Session *session);
void export_remaining_imported_caps();