From: Sage Weil Date: Thu, 5 Jun 2008 18:53:18 +0000 (-0700) Subject: client: simplify caps import/export behavior X-Git-Tag: v0.3~164 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=14e2b0f890b93ef3773fe4e45422f8403a8776d8;p=ceph.git client: simplify caps import/export behavior --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 5789e9f5579..3a1a4cc0b59 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1183,27 +1183,28 @@ void Client::send_reconnect(int mds) for (hash_map::iterator p = inode_map.begin(); p != inode_map.end(); p++) { - if (p->second->caps.count(mds)) { + Inode *in = p->second; + if (in->caps.count(mds)) { dout(10) << " caps on " << p->first - << " " << cap_string(p->second->caps[mds].issued) - << " wants " << cap_string(p->second->caps_wanted()) + << " " << cap_string(in->caps[mds].issued) + << " wants " << cap_string(in->caps_wanted()) << dendl; - p->second->caps[mds].seq = 0; // reset seq. + in->caps[mds].seq = 0; // reset seq. m->add_inode_caps(p->first, // ino - p->second->caps_wanted(), // wanted - p->second->caps[mds].issued, // issued - p->second->inode.size, p->second->inode.mtime, p->second->inode.atime); + in->caps_wanted(), // wanted + in->caps[mds].issued, // issued + in->inode.size, in->inode.mtime, in->inode.atime); filepath path; - p->second->make_path(path); + in->make_path(path); dout(10) << " path on " << p->first << " is " << path << dendl; m->add_inode_path(p->first, path.get_path()); } - /* - if (p->second->stale_caps.count(mds)) { - dout(10) << " clearing stale caps on " << p->first << dendl; - p->second->stale_caps.erase(mds); // hrm, is this right? + if (in->exporting_mds == mds) { + dout(10) << " clearing exporting_caps on " << p->first << dendl; + in->exporting_mds = -1; + in->exporting_issued = 0; + in->exporting_mseq = 0; } - */ } // reset my cap seq number @@ -1408,22 +1409,19 @@ void Client::check_caps(Inode *in) in->inode, it->second.seq, it->second.issued, - wanted); + wanted, + 0); in->reported_size = in->inode.size; m->set_max_size(in->wanted_max_size); in->requested_max_size = in->wanted_max_size; messenger->send_message(m, mdsmap->get_inst(it->first)); - if (wanted == 0) { - it->second.seq = 0; - if (it->second.can_drop()) { - mds_sessions[it->first].num_caps--; - in->caps.erase(it); - } - } + if (wanted == 0) + mds_sessions[it->first].num_caps--; } - if (in->caps.empty()) { + if (wanted == 0) { dout(10) << "last caps on " << *in << dendl; + in->caps.clear(); put_inode(in); } } @@ -1530,74 +1528,85 @@ void Client::handle_file_caps(MClientFileCaps *m) return; } - // reap? if (m->get_op() == CEPH_CAP_OP_IMPORT) { - int other = m->get_migrate_mds(); - - /* - * FIXME: handle mds failures - */ - // fresh from new mds? if (!in->caps.count(mds)) { mds_sessions[mds].num_caps++; - if (in->caps.empty()) in->get(); - in->caps[mds].seq = m->get_seq(); - in->caps[mds].issued = m->get_caps(); - } - - if (in->caps.count(other) && - in->caps[other].issued_exporting) { - dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other - << " - already saw EXPORT, done" << dendl; - in->caps[other].issued_exporting = 0; - if (in->caps[other].can_drop()) { - mds_sessions[other].num_caps--; - in->caps.erase(other); + if (in->caps.empty()) + in->get(); + if (in->exporting_mds == mds) { + dout(10) << " clearing exporting_caps on " << mds << dendl; + in->exporting_mds = -1; + in->exporting_issued = 0; + in->exporting_mseq = 0; } + } + in->caps[mds].seq = m->get_seq(); + in->caps[mds].issued = m->get_caps(); + in->caps[mds].mseq = m->get_mseq(); + + if (in->exporting_mseq < m->get_mseq()) { + dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq() + << " IMPORT from mds" << mds << ", clearing exporting_issued " << cap_string(in->exporting_issued) + << " mseq " << in->exporting_mseq << dendl; + in->exporting_issued = 0; + in->exporting_mseq = 0; + in->exporting_mds = -1; } else { - dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other - << " - no EXPORT yet, marking" << dendl; - in->caps[mds].importing_from.insert(other); + dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq() + << " IMPORT from mds" << mds << ", keeping exporting_issued " << cap_string(in->exporting_issued) + << " mseq " << in->exporting_mseq << " by mds" << in->exporting_mds << dendl; } - - // fall-thru! - // (wake ppl up, etc.) + delete m; + return; + } + + // don't have cap? + // (it may be that we've reopened the file locally, + // and thus want caps, but don't have a cap yet. + // we should never reply to a cap out of turn.) + if (in->caps.count(mds) == 0) { + // silently drop. + dout(5) << "handle_file_caps on ino " << m->get_ino() + << " seq " << m->get_seq() + << " " << cap_string(m->get_caps()) + << ", don't have this cap, silently dropping." << dendl; + delete m; + return; } + + // ok! + InodeCap &cap = in->caps[mds]; + // stale? if (m->get_op() == CEPH_CAP_OP_EXPORT) { - // move to stale list - bool found_importing = false; + // note + bool found_higher_mseq = false; for (map::iterator p = in->caps.begin(); p != in->caps.end(); p++) { if (p->first == mds) continue; - if (p->second.importing_from.count(mds) == 0) continue; - - dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds - << " - already saw IMPORT, done" << dendl; - p->second.importing_from.erase(mds); - mds_sessions[mds].num_caps--; - in->caps.erase(mds); - found_importing = true; - if (p->second.can_drop()) { - mds_sessions[p->first].num_caps--; - in->caps.erase(p); - if (in->caps.empty()) - put_inode(in); + if (p->second.mseq > m->get_mseq()) { + found_higher_mseq = true; + dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq() + << " EXPORT from mds" << mds + << ", but mds" << p->first << " has higher mseq " << p->second.mseq << dendl; + break; } - break; - } - if (!found_importing) { - dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds - << " - no IMPORT yet, marking" << dendl; - if (in->caps.empty()) in->get(); - InodeCap &cap = in->caps[mds]; - cap.issued_exporting = in->caps[mds].issued; - cap.issued = 0; - cap.seq = 0; } + if (!found_higher_mseq) { + dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq() + << " EXPORT from mds" << mds + << ", setting exporting_issued " << cap_string(cap.issued) << dendl; + in->exporting_issued = cap.issued; + in->exporting_mseq = m->get_mseq(); + in->exporting_mds = mds; + in->caps.erase(mds); + if (in->caps.empty()) + put_inode(in); + } + delete m; return; } @@ -1623,23 +1632,6 @@ void Client::handle_file_caps(MClientFileCaps *m) return; } - // don't have cap? - // (it may be that we've reopened the file locally, - // and thus want caps, but don't have a cap yet. - // we should never reply to a cap out of turn.) - if (in->caps.count(mds) == 0) { - // silently drop. - dout(5) << "handle_file_caps on ino " << m->get_ino() - << " seq " << m->get_seq() - << " " << cap_string(m->get_caps()) - << ", don't have this cap, silently dropping." << dendl; - delete m; - return; - } - - - // ok! - InodeCap &cap = in->caps[mds]; cap.seq = m->get_seq(); // don't want it? @@ -1653,13 +1645,9 @@ void Client::handle_file_caps(MClientFileCaps *m) m->set_caps(0); m->set_wanted(0); messenger->send_message(m, m->get_source_inst()); - cap.seq = 0; - if (cap.can_drop()) { - mds_sessions[mds].num_caps--; - in->caps.erase(mds); - if (in->caps.empty()) - put_inode(in); - } + + // FIXME... + return; } @@ -2891,8 +2879,15 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui dout(7) << " first caps on " << in->inode.ino << dendl; in->get(); } - if (in->caps.count(mds) == 0) + if (in->caps.count(mds) == 0) { mds_sessions[mds].num_caps++; + if (in->exporting_mds == mds) { + dout(10) << " clearing exporting_caps on " << mds << dendl; + in->exporting_mds = -1; + in->exporting_issued = 0; + in->exporting_mseq = 0; + } + } InodeCap &cap = in->caps[mds]; int new_caps = reply->get_file_caps(); @@ -2911,6 +2906,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui cap.issued |= new_caps; cap.implemented |= new_caps; cap.seq = reply->get_file_caps_seq(); + cap.mseq = reply->get_file_caps_mseq(); // any caps? if (new_caps & ~old_caps) diff --git a/src/client/Client.h b/src/client/Client.h index d48e4ad01a7..59f19c3c00c 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -129,19 +129,10 @@ class InodeCap { public: unsigned issued; unsigned implemented; - unsigned seq; + __u64 seq; + __u32 mseq; // migration seq - int issued_exporting; // if export comes first - set importing_from; // if import comes first - - InodeCap() : issued(0), implemented(0), seq(0), - issued_exporting(0) {} - - bool can_drop() { - return issued == 0 && - issued_exporting == 0 && - importing_from.empty(); - } + InodeCap() : issued(0), implemented(0), seq(0), mseq(0) {} }; @@ -158,6 +149,9 @@ class Inode { // per-mds caps map caps; // mds -> InodeCap + unsigned exporting_issued; + int exporting_mds; + capseq_t exporting_mseq; //int open_by_mode[CEPH_FILE_MODE_NUM]; map open_by_mode; @@ -211,6 +205,7 @@ class Inode { //inode(_inode), lease_mask(0), lease_mds(-1), dir_auth(-1), dir_hashed(false), dir_replicated(false), + exporting_issued(0), exporting_mds(-1), exporting_mseq(0), reported_size(0), wanted_max_size(0), requested_max_size(0), ref(0), ll_ref(0), dir(0), dn(0), symlink(0), @@ -244,11 +239,11 @@ class Inode { bool put_cap_ref(int cap); int caps_issued() { - int c = 0; + int c = exporting_issued; for (map::iterator it = caps.begin(); it != caps.end(); it++) - c |= it->second.issued | it->second.issued_exporting; + c |= it->second.issued; return c; } diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 5e62c9b01d3..88ec35d5d5f 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -598,6 +598,7 @@ struct ceph_mds_reply_head { __le32 result; __le32 file_caps; __le32 file_caps_seq; + __le32 file_caps_mseq; __le32 mdsmap_epoch; } __attribute__ ((packed)); @@ -716,7 +717,7 @@ struct ceph_mds_file_caps { __le32 caps, wanted; __le64 ino; __le64 size, max_size; - __le32 migrate_mds, migrate_seq; + __le32 migrate_seq; struct ceph_timespec mtime, atime, ctime; __le64 time_warp_seq; } __attribute__ ((packed)); diff --git a/src/mds/Capability.h b/src/mds/Capability.h index ea17da9adf4..6fb52ca2bee 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -54,20 +54,20 @@ public: int32_t wanted; int32_t issued; int32_t pending; - capseq_t seq; + capseq_t mseq; Export() {} - Export(int w, int i, int p, capseq_t s) : wanted(w), issued(i), pending(p), seq(s) {} + Export(int w, int i, int p, capseq_t s) : wanted(w), issued(i), pending(p), mseq(s) {} void encode(bufferlist &bl) const { ::encode(wanted, bl); ::encode(issued, bl); ::encode(pending, bl); - ::encode(seq, bl); + ::encode(mseq, bl); } void decode(bufferlist::iterator &p) { ::decode(wanted, p); ::decode(issued, p); ::decode(pending, p); - ::decode(seq, p); + ::decode(mseq, p); } }; @@ -78,6 +78,7 @@ private: map cap_history; // seq -> cap, [last_recv,last_sent] capseq_t last_sent, last_recv; capseq_t last_open; + capseq_t mseq; bool suppress; bool stale; @@ -90,10 +91,13 @@ public: last_sent(s), last_recv(s), last_open(0), + mseq(0), suppress(false), stale(false), session_caps_item(this) { } + capseq_t get_mseq() { return mseq; } + capseq_t get_last_open() { return last_open; } void set_last_open() { last_open = last_sent; } @@ -177,7 +181,7 @@ public: capseq_t get_last_seq() { return last_sent; } Export make_export() { - return Export(wanted_caps, issued(), pending(), last_sent); + return Export(wanted_caps, issued(), pending(), mseq+1); } void merge(Export& other) { // issued + pending @@ -188,7 +192,7 @@ public: // wanted wanted_caps = wanted_caps | other.wanted; - last_sent = MAX(last_sent, other.seq); + mseq = other.mseq; } void merge(int otherwanted, int otherissued) { // issued + pending diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index ac7b3af338b..d7f98b5ff92 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -626,7 +626,8 @@ bool Locker::issue_caps(CInode *in) in->inode, cap->get_last_seq(), cap->pending(), - cap->wanted()), + cap->wanted(), + cap->get_mseq()), it->first); } } @@ -647,7 +648,8 @@ void Locker::issue_truncate(CInode *in) in->inode, cap->get_last_seq(), cap->pending(), - cap->wanted()), + cap->wanted(), + cap->get_mseq()), it->first); } @@ -905,7 +907,8 @@ void Locker::share_inode_max_size(CInode *in) in->inode, cap->get_last_seq(), cap->pending(), - cap->wanted()), + cap->wanted(), + cap->get_mseq()), client); } } diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 9a88a8f527e..de5ff2958fb 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -2786,13 +2786,13 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t& Capability *cap = in->reconnect_cap(client, icr); session->touch_cap(cap); - // send REAP + // send IMPORT MClientFileCaps *reap = new MClientFileCaps(CEPH_CAP_OP_IMPORT, in->inode, cap->get_last_seq(), cap->pending(), - cap->wanted()); - reap->set_migrate_mds(frommds); // reap from whom? + cap->wanted(), + cap->get_mseq()); mds->messenger->send_message(reap, session->inst); } diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 9776a8c8a86..da66547bc23 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -900,7 +900,8 @@ void Migrator::finish_export_inode_caps(CInode *in) in->inode, cap->get_last_seq(), cap->pending(), - cap->wanted()); + cap->wanted(), + cap->get_mseq()); mds->send_message_client(m, it->first); } in->clear_client_caps(); @@ -2055,7 +2056,7 @@ void Migrator::finish_import_inode_caps(CInode *in, int from, cap->get_last_seq(), cap->pending(), cap->wanted(), - from); + cap->get_mseq()); mds->send_message_client(caps, session->inst); } diff --git a/src/mds/Server.cc b/src/mds/Server.cc index c1820c41cbb..ae6267c1f68 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -401,7 +401,8 @@ void Server::handle_client_reconnect(MClientReconnect *m) fake_inode, 0, 0, // doesn't matter. - p->second.wanted); // doesn't matter. + p->second.wanted, // doesn't matter. + 0); // FIXME get proper mseq here? hmm. mds->send_message_client(stale, m->get_source_inst()); // add to cap export list. @@ -4391,6 +4392,7 @@ void Server::_do_open(MDRequest *mdr, CInode *cur) MClientReply *reply = new MClientReply(req, 0); reply->set_file_caps(cap->pending()); reply->set_file_caps_seq(cap->get_last_seq()); + reply->set_file_caps_mseq(cap->get_mseq()); //reply->set_file_data_version(fdv); reply_request(mdr, reply); diff --git a/src/messages/MClientFileCaps.h b/src/messages/MClientFileCaps.h index 4a28576e6c2..8c00e9d47ce 100644 --- a/src/messages/MClientFileCaps.h +++ b/src/messages/MClientFileCaps.h @@ -39,6 +39,7 @@ class MClientFileCaps : public Message { int get_caps() { return h.caps; } int get_wanted() { return h.wanted; } capseq_t get_seq() { return h.seq; } + capseq_t get_mseq() { return h.migrate_seq; } inodeno_t get_ino() { return inodeno_t(h.ino); } __u64 get_size() { return h.size; } @@ -48,8 +49,6 @@ class MClientFileCaps : public Message { utime_t get_atime() { return utime_t(h.atime); } __u64 get_time_warp_seq() { return h.time_warp_seq; } - // for cap migration - int get_migrate_mds() { return h.migrate_mds; } int get_migrate_seq() { return h.migrate_seq; } int get_op() { return h.op; } @@ -58,7 +57,6 @@ class MClientFileCaps : public Message { void set_max_size(__u64 ms) { h.max_size = ms; } - void set_migrate_mds(int m) { h.migrate_mds = m; } void set_migrate_seq(int m) { h.migrate_seq = m; } void set_op(int o) { h.op = o; } @@ -72,8 +70,7 @@ class MClientFileCaps : public Message { long seq, int caps, int wanted, - int mmds=0, - int mseq=0) : + int mseq) : Message(CEPH_MSG_CLIENT_FILECAPS) { h.op = op; h.seq = seq; @@ -82,7 +79,6 @@ class MClientFileCaps : public Message { h.ino = inode.ino; h.size = inode.size; h.max_size = inode.max_size; - h.migrate_mds = mmds; h.migrate_seq = mseq; inode.mtime.encode_timeval(&h.mtime); inode.atime.encode_timeval(&h.atime); @@ -99,8 +95,10 @@ class MClientFileCaps : public Message { << " wanted" << cap_string(h.wanted) << " size " << h.size << "/" << h.max_size << " mtime " << utime_t(h.mtime) - << " tws " << h.time_warp_seq - << ")"; + << " tws " << h.time_warp_seq; + if (h.migrate_seq) + out << " mseq " << h.migrate_seq; + out << ")"; } void decode_payload() { diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h index 7604a219ec4..cd70d803965 100644 --- a/src/messages/MClientReply.h +++ b/src/messages/MClientReply.h @@ -228,11 +228,13 @@ class MClientReply : public Message { unsigned get_file_caps() { return st.file_caps; } unsigned get_file_caps_seq() { return st.file_caps_seq; } + unsigned get_file_caps_mseq() { return st.file_caps_mseq; } //uint64_t get_file_data_version() { return st.file_data_version; } void set_result(int r) { st.result = r; } void set_file_caps(unsigned char c) { st.file_caps = c; } - void set_file_caps_seq(long s) { st.file_caps_seq = s; } + void set_file_caps_seq(capseq_t s) { st.file_caps_seq = s; } + void set_file_caps_mseq(capseq_t s) { st.file_caps_mseq = s; } //void set_file_data_version(uint64_t v) { st.file_data_version = v; } MClientReply() {} diff --git a/src/vstartnew.sh b/src/vstartnew.sh index 73f7b03b845..e3d0b3db19e 100755 --- a/src/vstartnew.sh +++ b/src/vstartnew.sh @@ -48,8 +48,8 @@ do done # mds -$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20 -$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20 +$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 10 #--debug_ms 20 +$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 10 #--debug_ms 20 ./cmonctl mds set_max_mds 2 echo "started. stop.sh to stop. see out/* (e.g. 'tail -f out/????') for debug output."