From: Yan, Zheng Date: Tue, 26 Nov 2013 02:17:30 +0000 (+0800) Subject: mds: send info of imported caps back to the exporter (cache rejoin) X-Git-Tag: v0.75~93^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=85171fd6c249b8de8060f884a776559c773bce7e;p=ceph.git mds: send info of imported caps back to the exporter (cache rejoin) Use cache rejoin ack message to send information of rejoin imported caps back to the exporter. Also move the code that exports reconnect caps to MDCache::handle_cache_rejoin_ack() This is preparation for including counterpart's information in cap import/export message. Signed-off-by: Yan, Zheng --- diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index f2520c88c294..d2942708fdbb 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -3876,6 +3876,8 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak) dout(10) << "i am a surivivor, and will ack immediately" << dendl; ack = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK); + map > imported_caps; + // check cap exports for (map >::iterator p = weak->cap_exports.begin(); p != weak->cap_exports.end(); @@ -3886,10 +3888,16 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak) q != p->second.end(); ++q) { dout(10) << " claiming cap import " << p->first << " client." << q->first << " on " << *in << dendl; - rejoin_import_cap(in, q->first, q->second, from); + Capability *cap = rejoin_import_cap(in, q->first, q->second, from); + Capability::Import& im = imported_caps[p->first][q->first]; + im.cap_id = cap->get_cap_id(); + im.issue_seq = cap->get_last_seq(); + im.mseq = cap->get_mseq(); } mds->locker->eval(in, CEPH_CAP_LOCKS, true); } + + ::encode(imported_caps, ack->imported_caps); } else { assert(mds->is_rejoin()); @@ -4729,6 +4737,35 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) // belongs to, were trimmed between sending cache rejoin and receiving rejoin ack. assert(isolated_inodes.empty()); + map > peer_imported; + bufferlist::iterator bp = ack->imported_caps.begin(); + ::decode(peer_imported, bp); + + for (map >::iterator p = peer_imported.begin(); + p != peer_imported.end(); + ++p) { + assert(cap_exports.count(p->first)); + assert(cap_export_targets.count(p->first)); + assert(cap_export_targets[p->first] == from); + for (map::iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + assert(cap_exports[p->first].count(q->first)); + + dout(10) << " exporting caps for client." << q->first << " ino " << p->first << dendl; + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v)); + assert(session); + + // mark client caps stale. + MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, + cap_exports[p->first][q->first].cap_id, 0); + mds->send_message_client_counted(m, session); + + cap_exports[p->first].erase(q->first); + } + assert(cap_exports[p->first].empty()); + } + // done? assert(rejoin_ack_gather.count(from)); rejoin_ack_gather.erase(from); @@ -4996,33 +5033,28 @@ bool MDCache::process_imported_caps() } assert(in->is_auth()); for (map >::iterator q = p->second.begin(); - q != p->second.end(); - ++q) + q != p->second.end(); + ++q) { + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v)); + assert(session); for (map::iterator r = q->second.begin(); - r != q->second.end(); - ++r) { - dout(20) << " add_reconnected_cap " << in->ino() << " client." << q->first << dendl; + r != q->second.end(); + ++r) { add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm)); - rejoin_import_cap(in, q->first, r->second, r->first); + Capability *cap = in->reconnect_cap(q->first, r->second, session); + if (r->first >= 0) { + do_cap_import(session, in, cap); + + Capability::Import& im = rejoin_imported_caps[r->first][p->first][q->first]; + im.cap_id = cap->get_cap_id(); + im.issue_seq = cap->get_last_seq(); + im.mseq = cap->get_mseq(); + } } + } cap_imports.erase(p++); // remove and move on } } else { - for (map >::iterator q = cap_exports.begin(); - q != cap_exports.end(); - ++q) { - for (map::iterator r = q->second.begin(); - r != q->second.end(); - ++r) { - dout(10) << " exporting caps for client." << r->first << " ino " << q->first << dendl; - Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(r->first.v)); - assert(session); - // mark client caps stale. - MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, q->first, 0, 0, 0); - mds->send_message_client_counted(m, session); - } - } - trim_non_auth(); rejoin_gather.erase(mds->get_nodeid()); @@ -5174,7 +5206,7 @@ void MDCache::clean_open_file_lists() -void MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds) +Capability* MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds) { dout(10) << "rejoin_import_cap for client." << client << " from mds." << frommds << " on " << *in << dendl; @@ -5187,6 +5219,8 @@ void MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconn cap->rejoin_import(); do_cap_import(session, in, cap); } + + return cap; } void MDCache::export_remaining_imported_caps() @@ -5594,9 +5628,12 @@ void MDCache::rejoin_send_acks() // send acks for (map::iterator p = ack.begin(); p != ack.end(); - ++p) + ++p) { + ::encode(rejoin_imported_caps[p->first], p->second->imported_caps); mds->send_message_mds(p->second, p->first); - + } + + rejoin_imported_caps.clear(); } diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 3463efcf750e..fd66a32a03b4 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -411,6 +411,7 @@ protected: set rejoin_gather; // nodes from whom i need a rejoin set rejoin_sent; // nodes i sent a rejoin to set rejoin_ack_gather; // nodes from whom i need a rejoin ack + map > > rejoin_imported_caps; map > cap_exports; // ino -> client -> capex map cap_export_targets; // ino -> auth mds @@ -494,7 +495,7 @@ public: map& splits); void do_realm_invalidate_and_update_notify(CInode *in, int snapop, bool nosend=false); void send_snaps(map& splits); - void rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds); + Capability* rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds); void finish_snaprealm_reconnect(client_t client, SnapRealm *realm, snapid_t seq); void try_reconnect_cap(CInode *in, Session *session); void export_remaining_imported_caps(); diff --git a/src/messages/MMDSCacheRejoin.h b/src/messages/MMDSCacheRejoin.h index 4f079a91126e..d48cd67a4285 100644 --- a/src/messages/MMDSCacheRejoin.h +++ b/src/messages/MMDSCacheRejoin.h @@ -168,6 +168,7 @@ class MMDSCacheRejoin : public Message { // open map > cap_exports; + bufferlist imported_caps; // full bufferlist inode_base; @@ -299,6 +300,7 @@ public: ::encode(xlocked_inodes, payload); ::encode(wrlocked_inodes, payload); ::encode(cap_exports, payload); + ::encode(imported_caps, payload); ::encode(strong_dirfrags, payload); ::encode(dirfrag_bases, payload); ::encode(weak, payload); @@ -320,6 +322,7 @@ public: ::decode(xlocked_inodes, p); ::decode(wrlocked_inodes, p); ::decode(cap_exports, p); + ::decode(imported_caps, p); ::decode(strong_dirfrags, p); ::decode(dirfrag_bases, p); ::decode(weak, p);