From: Yan, Zheng Date: Tue, 26 Nov 2013 03:02:49 +0000 (+0800) Subject: mds: include counterpart's information in cap import/export messages X-Git-Tag: v0.75~93^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4fdeb00df20ccd36c1e53c6ea234c63c18a9ff5a;p=ceph.git mds: include counterpart's information in cap import/export messages when exporting indoes with client caps, the importer sends cap import messages to clients, the exporter sends cap export messages to clients. A client can receive these two messages in any order. If a client first receives cap import message, it adds the imported caps. but the caps from the exporter are still considered as valid. This can compromise consistence. If MDS crashes while importing caps, clients can only receive cap export messages, but don't receive cap import messages. These clients don't know which MDS is the cap importer, so they can't send cap reconnect when the MDS recovers. We can handle above issues by including counterpart's information in cap import/export messages. If a client first receives cap import message, it added the imported caps, then removes the the exporter's caps. If a client first receives cap export message, it removes the exported caps, then adds caps for the importer. Signed-off-by: Yan, Zheng --- diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index f1f286faf71d..dabc40a9d84d 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -665,6 +665,15 @@ struct ceph_mds_caps { __le32 time_warp_seq; } __attribute__ ((packed)); +/* extra info for cap import/export */ +struct ceph_mds_cap_peer { + __le64 cap_id; + __le32 seq; + __le32 mseq; + __le32 mds; + __u8 flags; +} __attribute__ ((packed)); + /* cap release msg head */ struct ceph_mds_cap_release { __le32 num; /* number of cap_items that follow */ diff --git a/src/include/types.h b/src/include/types.h index 5a9e6f6d4c9d..33304521f909 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -242,6 +242,7 @@ WRITE_RAW_ENCODER(ceph_mds_request_head) WRITE_RAW_ENCODER(ceph_mds_request_release) WRITE_RAW_ENCODER(ceph_filelock) WRITE_RAW_ENCODER(ceph_mds_caps) +WRITE_RAW_ENCODER(ceph_mds_cap_peer) WRITE_RAW_ENCODER(ceph_mds_cap_release) WRITE_RAW_ENCODER(ceph_mds_cap_item) WRITE_RAW_ENCODER(ceph_mds_lease) diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 9c9360a0abf2..d5d5a19a4352 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -2555,8 +2555,8 @@ Capability *CInode::reconnect_cap(client_t client, ceph_mds_cap_reconnect& icr, cap->set_wanted(icr.wanted); cap->issue_norevoke(icr.issued); cap->reset_seq(); + cap->set_cap_id(icr.cap_id); } - cap->set_cap_id(icr.cap_id); cap->set_last_issue_stamp(ceph_clock_now(g_ceph_context)); return cap; } diff --git a/src/mds/Capability.h b/src/mds/Capability.h index 6f98aec05de0..c04a05285cf3 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -246,6 +246,7 @@ public: } ceph_seq_t get_mseq() { return mseq; } + void inc_mseq() { mseq++; } ceph_seq_t get_last_sent() { return last_sent; } utime_t get_last_issue_stamp() { return last_issue_stamp; } @@ -288,7 +289,6 @@ public: Export make_export() { return Export(cap_id, _wanted, issued(), pending(), client_follows, last_sent, mseq+1, last_issue_stamp); } - void rejoin_import() { mseq++; } void merge(Export& other, bool auth_cap) { // issued + pending int newpending = other.pending | pending(); diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 21289a1b3346..811f31b0b952 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -4759,6 +4759,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) // mark client caps stale. MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, cap_exports[p->first][q->first].cap_id, 0); + m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, from, 0); mds->send_message_client_counted(m, session); cap_exports[p->first].erase(q->first); @@ -5043,7 +5044,9 @@ bool MDCache::process_imported_caps() add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm)); Capability *cap = in->reconnect_cap(q->first, r->second, session); if (r->first >= 0) { - do_cap_import(session, in, cap); + if (cap->get_last_seq() == 0) // don't increase mseq if cap already exists + cap->inc_mseq(); + do_cap_import(session, in, cap, r->second.cap_id, 0, 0, r->first, 0); Capability::Import& im = rejoin_imported_caps[r->first][p->first][q->first]; im.cap_id = cap->get_cap_id(); @@ -5216,8 +5219,9 @@ Capability* MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap Capability *cap = in->reconnect_cap(client, icr, session); if (frommds >= 0) { - cap->rejoin_import(); - do_cap_import(session, in, cap); + if (cap->get_last_seq() == 0) // don't increase mseq if cap already exists + cap->inc_mseq(); + do_cap_import(session, in, cap, icr.cap_id, 0, 0, frommds, 0); } return cap; @@ -5240,6 +5244,7 @@ void MDCache::export_remaining_imported_caps() if (session) { // mark client caps stale. MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0, 0); + stale->set_cap_peer(0, 0, 0, -1, 0); mds->send_message_client_counted(stale, q->first); } } @@ -5279,7 +5284,9 @@ void MDCache::try_reconnect_cap(CInode *in, Session *session) // ------- // cap imports and delayed snap parent opens -void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap) +void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap, + uint64_t p_cap_id, ceph_seq_t p_seq, ceph_seq_t p_mseq, + int peer, int p_flags) { client_t client = session->info.inst.name.num(); SnapRealm *realm = in->find_snaprealm(); @@ -5296,6 +5303,7 @@ void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap) cap->get_mseq()); in->encode_cap_message(reap, cap); realm->build_snap_trace(reap->snapbl); + reap->set_cap_peer(p_cap_id, p_seq, p_mseq, peer, p_flags); mds->send_message_client_counted(reap, session); } else { dout(10) << "do_cap_import missing past snap parents, delaying " << session->info.inst.name << " mseq " @@ -5311,6 +5319,8 @@ void MDCache::do_delayed_cap_imports() { dout(10) << "do_delayed_cap_imports" << dendl; + assert(delayed_imported_caps.empty()); +#if 0 map > d; d.swap(delayed_imported_caps); @@ -5335,6 +5345,7 @@ void MDCache::do_delayed_cap_imports() mds->locker->issue_caps(in); } } +#endif } struct C_MDC_OpenSnapParents : public Context { diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index fd66a32a03b4..0aa63c771b04 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -505,7 +505,9 @@ public: map > > missing_snap_parents; map > delayed_imported_caps; - void do_cap_import(Session *session, CInode *in, Capability *cap); + void do_cap_import(Session *session, CInode *in, Capability *cap, + uint64_t p_cap_id, ceph_seq_t p_seq, ceph_seq_t p_mseq, + int peer, int p_flags); void do_delayed_cap_imports(); void check_realm_past_parents(SnapRealm *realm); void open_snap_parents(); diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 6fe4c785b382..30b037cc505e 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -1186,7 +1186,8 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl, exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first.v)); } -void Migrator::finish_export_inode_caps(CInode *in) +void Migrator::finish_export_inode_caps(CInode *in, int peer, + map& peer_imported) { dout(20) << "finish_export_inode_caps " << *in << dendl; @@ -1198,21 +1199,23 @@ void Migrator::finish_export_inode_caps(CInode *in) it != in->client_caps.end(); ++it) { Capability *cap = it->second; - dout(7) << "finish_export_inode telling client." << it->first + dout(7) << "finish_export_inode_caps telling client." << it->first << " exported caps on " << *in << dendl; - MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, - in->ino(), - in->find_snaprealm()->inode->ino(), - cap->get_cap_id(), cap->get_last_seq(), - cap->pending(), cap->wanted(), 0, - cap->get_mseq()); + MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, in->ino(), 0, + cap->get_cap_id(), cap->get_mseq()); + + map::iterator q = peer_imported.find(it->first); + assert(q != peer_imported.end()); + m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0); mds->send_message_client_counted(m, it->first); } in->clear_client_caps_after_export(); mds->locker->eval(in, CEPH_CAP_LOCKS); } -void Migrator::finish_export_inode(CInode *in, utime_t now, list& finished) +void Migrator::finish_export_inode(CInode *in, utime_t now, int peer, + map& peer_imported, + list& finished) { dout(12) << "finish_export_inode " << *in << dendl; @@ -1254,7 +1257,7 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list& fini in->finish_export(now); - finish_export_inode_caps(in); + finish_export_inode_caps(in, peer, peer_imported); // *** other state too? @@ -1355,7 +1358,9 @@ int Migrator::encode_export_dir(bufferlist& exportbl, return num_exported; } -void Migrator::finish_export_dir(CDir *dir, list& finished, utime_t now) +void Migrator::finish_export_dir(CDir *dir, utime_t now, int peer, + map >& peer_imported, + list& finished) { dout(10) << "finish_export_dir " << *dir << dendl; @@ -1389,7 +1394,7 @@ void Migrator::finish_export_dir(CDir *dir, list& finished, utime_t no // inode? if (dn->get_linkage()->is_primary()) { - finish_export_inode(in, now, finished); + finish_export_inode(in, now, peer, peer_imported[in->ino()], finished); // subdirs? in->get_nested_dirfrags(subdirs); @@ -1398,7 +1403,7 @@ void Migrator::finish_export_dir(CDir *dir, list& finished, utime_t no // subdirs for (list::iterator it = subdirs.begin(); it != subdirs.end(); ++it) - finish_export_dir(*it, finished, now); + finish_export_dir(*it, now, peer, peer_imported, finished); } class C_MDS_ExportFinishLogged : public Context { @@ -1665,7 +1670,8 @@ void Migrator::export_finish(CDir *dir) // finish export (adjust local cache state) C_Contexts *fin = new C_Contexts(g_ceph_context); - finish_export_dir(dir, fin->contexts, ceph_clock_now(g_ceph_context)); + finish_export_dir(dir, ceph_clock_now(g_ceph_context), + it->second.peer, it->second.peer_imported, fin->contexts); dir->add_waiter(CDir::WAIT_UNFREEZE, fin); // unfreeze @@ -2382,7 +2388,8 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, int from, for (map >::iterator p = it->second.peer_exports.begin(); p != it->second.peer_exports.end(); ++p) { - finish_import_inode_caps(p->first, true, p->second, imported_caps[p->first->ino()]); + + finish_import_inode_caps(p->first, from, true, p->second, imported_caps[p->first->ino()]); } // send notify's etc. @@ -2561,7 +2568,7 @@ void Migrator::decode_import_inode_caps(CInode *in, } } -void Migrator::finish_import_inode_caps(CInode *in, bool auth_cap, +void Migrator::finish_import_inode_caps(CInode *in, int peer, bool auth_cap, map &export_map, map &import_map) { @@ -2583,7 +2590,9 @@ void Migrator::finish_import_inode_caps(CInode *in, bool auth_cap, im.issue_seq = cap->get_last_seq() + 1; cap->merge(it->second, auth_cap); - mds->mdcache->do_cap_import(session, in, cap); + mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id, + it->second.seq, it->second.mseq - 1, peer, + auth_cap ? CEPH_CAP_FLAG_AUTH : 0); } in->replica_caps_wanted = 0; @@ -2863,7 +2872,7 @@ void Migrator::logged_import_caps(CInode *in, map imported_caps; assert(peer_exports.count(in)); - finish_import_inode_caps(in, false, peer_exports[in], imported_caps); + finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps); mds->locker->eval(in, CEPH_CAP_LOCKS, true); mds->send_message_mds(new MExportCapsAck(in->ino()), from); diff --git a/src/mds/Migrator.h b/src/mds/Migrator.h index 10d54511388b..58dce1cb67a2 100644 --- a/src/mds/Migrator.h +++ b/src/mds/Migrator.h @@ -238,14 +238,20 @@ public: map& exported_client_map); void encode_export_inode_caps(CInode *in, bufferlist& bl, map& exported_client_map); - void finish_export_inode(CInode *in, utime_t now, list& finished); - void finish_export_inode_caps(CInode *in); + void finish_export_inode(CInode *in, utime_t now, int target, + map& peer_imported, + list& finished); + void finish_export_inode_caps(CInode *in, int target, + map& peer_imported); + int encode_export_dir(bufferlist& exportbl, CDir *dir, map& exported_client_map, utime_t now); - void finish_export_dir(CDir *dir, list& finished, utime_t now); + void finish_export_dir(CDir *dir, utime_t now, int target, + map >& peer_imported, + list& finished); void add_export_finish_waiter(CDir *dir, Context *c) { map::iterator it = export_state.find(dir); @@ -299,7 +305,7 @@ public: void decode_import_inode_caps(CInode *in, bufferlist::iterator &blp, map >& cap_imports); - void finish_import_inode_caps(CInode *in, bool auth_cap, + void finish_import_inode_caps(CInode *in, int from, bool auth_cap, map &export_map, map &import_map); int decode_import_dir(bufferlist::iterator& blp, diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 1d249403ddb7..8242d226ff57 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -6340,7 +6340,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map); if (mdr->more()->cap_imports.count(destdnl->get_inode())) { mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(), - mdr->more()->srcdn_auth_mds, + mdr->more()->srcdn_auth_mds, true, mdr->more()->cap_imports[destdnl->get_inode()], imported_caps); } @@ -6734,7 +6734,8 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, ::decode(peer_imported, bp); dout(10) << " finishing inode export on " << *destdnl->get_inode() << dendl; - mdcache->migrator->finish_export_inode(destdnl->get_inode(), mdr->now, finished); + mdcache->migrator->finish_export_inode(destdnl->get_inode(), mdr->now, + mdr->slave_to_mds, peer_imported, finished); mds->queue_waiters(finished); // this includes SINGLEAUTH waiters. // unfreeze diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h index 117f24162e7d..04a393d2c7f1 100644 --- a/src/messages/MClientCaps.h +++ b/src/messages/MClientCaps.h @@ -21,11 +21,12 @@ class MClientCaps : public Message { - static const int HEAD_VERSION = 2; // added flock metadata + static const int HEAD_VERSION = 3; // added flock metadata static const int COMPAT_VERSION = 1; public: struct ceph_mds_caps head; + struct ceph_mds_cap_peer peer; bufferlist snapbl; bufferlist xattrbl; bufferlist flockbl; @@ -73,6 +74,14 @@ class MClientCaps : public Message { void set_mtime(const utime_t &t) { t.encode_timeval(&head.mtime); } void set_atime(const utime_t &t) { t.encode_timeval(&head.atime); } + void set_cap_peer(uint64_t id, ceph_seq_t seq, ceph_seq_t mseq, int mds, int flags) { + peer.cap_id = id; + peer.seq = seq; + peer.mseq = mseq; + peer.mds = mds; + peer.flags = flags; + } + MClientCaps() : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION) { } MClientCaps(int op, @@ -95,6 +104,7 @@ class MClientCaps : public Message { head.wanted = wanted; head.dirty = dirty; head.migrate_seq = mseq; + peer.cap_id = 0; } MClientCaps(int op, inodeno_t ino, inodeno_t realm, @@ -106,6 +116,7 @@ class MClientCaps : public Message { head.realm = realm; head.cap_id = id; head.migrate_seq = mseq; + peer.cap_id = 0; } private: ~MClientCaps() {} @@ -151,10 +162,22 @@ public: // conditionally decode flock metadata if (header.version >= 2) ::decode(flockbl, p); + + if (header.version >= 3) { + if (head.op == CEPH_CAP_OP_IMPORT) + ::decode(peer, p); + else if (head.op == CEPH_CAP_OP_EXPORT) + memcpy(&peer, &head.size, sizeof(peer)); + } } void encode_payload(uint64_t features) { head.snap_trace_len = snapbl.length(); head.xattr_len = xattrbl.length(); + + // record peer in unused fields of cap export message + if (head.op == CEPH_CAP_OP_EXPORT) + memcpy(&head.size, &peer, sizeof(peer)); + ::encode(head, payload); ::encode_nohead(snapbl, payload); @@ -164,7 +187,16 @@ public: if (features & CEPH_FEATURE_FLOCK) { ::encode(flockbl, payload); } else { - header.version = 1; // old + header.version = 1; + return; + } + + if (true) { + if (head.op == CEPH_CAP_OP_IMPORT) + ::encode(peer, payload); + } else { + header.version = 2; + return; } } };