From 1fae43b08eec804112a0572a9b98cd95ce8e611c Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 16 Nov 2007 21:46:52 +0000 Subject: [PATCH] merged r1958:2075 from branches/sage/mds back into trunk git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2076 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/TODO | 51 ++--- trunk/ceph/client/Client.cc | 46 ++-- trunk/ceph/client/Client.h | 4 +- trunk/ceph/client/SyntheticClient.cc | 25 +- trunk/ceph/include/ceph_fs.h | 2 + trunk/ceph/include/filepath.h | 77 ++++--- trunk/ceph/include/types.h | 23 +- trunk/ceph/kernel/mds_client.c | 12 +- trunk/ceph/mds/CDentry.cc | 33 ++- trunk/ceph/mds/CDentry.h | 6 +- trunk/ceph/mds/CDir.cc | 2 +- trunk/ceph/mds/CInode.cc | 46 ++-- trunk/ceph/mds/CInode.h | 19 +- trunk/ceph/mds/ClientMap.h | 33 ++- trunk/ceph/mds/Locker.cc | 21 +- trunk/ceph/mds/LogEvent.cc | 8 +- trunk/ceph/mds/LogEvent.h | 20 +- trunk/ceph/mds/LogSegment.h | 4 +- trunk/ceph/mds/MDBalancer.cc | 2 +- trunk/ceph/mds/MDCache.cc | 257 +++++++++++++++++---- trunk/ceph/mds/MDCache.h | 11 +- trunk/ceph/mds/MDLog.cc | 10 +- trunk/ceph/mds/MDLog.h | 3 + trunk/ceph/mds/MDS.cc | 52 +---- trunk/ceph/mds/MDS.h | 6 +- trunk/ceph/mds/Migrator.cc | 279 +++++++++++++++++++---- trunk/ceph/mds/Migrator.h | 47 ++-- trunk/ceph/mds/Server.cc | 200 +++++++++++----- trunk/ceph/mds/Server.h | 5 +- trunk/ceph/mds/events/EImportStart.h | 6 + trunk/ceph/mds/events/ESession.h | 12 +- trunk/ceph/mds/events/ESessions.h | 55 +++++ trunk/ceph/mds/events/EUpdate.h | 3 + trunk/ceph/mds/journal.cc | 52 ++++- trunk/ceph/mds/mdstypes.h | 25 +- trunk/ceph/messages/MClientFileCaps.h | 16 +- trunk/ceph/messages/MClientReply.h | 3 + trunk/ceph/messages/MClientRequest.h | 38 +-- trunk/ceph/messages/MDirUpdate.h | 13 +- trunk/ceph/messages/MExportCaps.h | 50 ++++ trunk/ceph/messages/MExportCapsAck.h | 46 ++++ trunk/ceph/messages/MExportDirDiscover.h | 8 +- trunk/ceph/messages/MMDSGetMap.h | 12 +- trunk/ceph/messages/MMDSSlaveRequest.h | 12 +- trunk/ceph/mon/MDSMonitor.cc | 16 +- trunk/ceph/mon/MDSMonitor.h | 4 +- trunk/ceph/msg/FakeMessenger.cc | 7 +- trunk/ceph/msg/FakeMessenger.h | 9 - trunk/ceph/msg/Message.cc | 11 + trunk/ceph/msg/Message.h | 3 + trunk/ceph/msg/Messenger.h | 20 +- trunk/ceph/msg/SimpleMessenger.cc | 30 +-- trunk/ceph/msg/SimpleMessenger.h | 6 +- trunk/ceph/msg/msg_types.h | 3 +- trunk/ceph/osdc/Journaler.h | 1 + 55 files changed, 1254 insertions(+), 511 deletions(-) create mode 100644 trunk/ceph/mds/events/ESessions.h create mode 100644 trunk/ceph/messages/MExportCaps.h create mode 100644 trunk/ceph/messages/MExportCapsAck.h diff --git a/trunk/ceph/TODO b/trunk/ceph/TODO index 78c0b4e8918f8..37a2a81728876 100644 --- a/trunk/ceph/TODO +++ b/trunk/ceph/TODO @@ -16,48 +16,32 @@ code cleanup - word size - clean up all encoded structures -kernel planning -- soft consistency on (kernel) lookup? -- accurate reconstruction of (syscall) path? mds mustfix -- open file rejournaling vs capped log... - - open files vs shutdown in general! need to export any caps on replicated metadata -- export caps to auth on unlinked inodes -- stray purge on shutdown - - rename slave in-memory rollback on failure - -- fix purge_stray bug -- try_remove_unlinked_dn thing - -- client session open from locker.. doesn't work properly with delays - -> journal the session open _with_ the import(start) - - proper handling of cache expire messages during rejoin phase? + -> i think cache expires are fine; the rejoin_ack handler just has to behave if rejoining items go missing +- try_remove_unlinked_dn thing -- verify once-per-segment jouranl context is working... +- rerun destro trace against latest, with various journal lengths mds +- stray reintegration - extend/clean up filepath to allow paths relative to an ino - fix path_traverse - fix reconnect/rejoin open file weirdness - -- get rid of replicate objects for replicate_to .. encode to bufferlists directly - -- stray reintegration -- verify stray is empty on shutdown - - real chdir (directory "open") - relative metadata ops - +- get rid of C*Discover objects for replicate_to .. encode to bufferlists directly? - consistency points/snapshots - dentry versions vs dirfrags... - - detect and deal with client failure - failure during reconnect vs clientmap. although probalby the whole thing needs a larger overhaul... +- inode.rmtime (recursive mtime) +- make inode.size reflect directory size (number of entries) + - inode.max_size - inode.allocated_size @@ -85,6 +69,13 @@ mds - delayed replica caps release... we need to set a timer event? (and cancel it when appropriate?) +client +- clean up client mds session vs mdsmap behavior? +- client caps migration races + - caps need a seq number; reap logic needs to be a bit smarter + - also needs cope with mds failures +- fstat + osdmon - allow fresh replacement osds. add osd_created in osdmap, probably @@ -173,11 +164,14 @@ reliability ebofs - allow holes +- allow btree sets +- optionally scrub deallocated extents +- clone() + +- map ObjectStore - verify proper behavior of conflicting/overlapping reads of clones - combine inodes and/or cnodes into same blocks -- allow btree sets instead of maps -- eliminate nodepools - nonblocking write on missing onodes? - fix bug in node rotation on insert (and reenable) - fix NEAR_LAST_FWD (?) @@ -226,11 +220,6 @@ crush -client -- fstat -- mixed lazy and non-lazy io will clobber each others' caps in the buffer cache.. how to isolate.. -- test client caps migration w/ mds exports -- some heuristic behavior to consolidate caps to inode auth? diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index 2f82cb5f935c5..cad6af279c561 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -509,8 +509,7 @@ Dentry *Client::lookup(filepath& path) Dentry *dn = 0; for (unsigned i=0; ihave_inst(mds)) { + if (!mdsmap->is_active(mds)) { dout(10) << "no address for mds" << mds << ", requesting new mdsmap" << dendl; int mon = monmap->pick_mon(); - messenger->send_message(new MMDSGetMap(), + messenger->send_message(new MMDSGetMap(mdsmap->get_epoch()), monmap->get_inst(mon)); waiting_for_mdsmap.push_back(&cond); cond.Wait(client_lock); - if (!mdsmap->have_inst(mds)) { + if (!mdsmap->is_active(mds)) { dout(10) << "hmm, still have no address for mds" << mds << ", trying a random mds" << dendl; request.resend_mds = mdsmap->get_random_in_mds(); continue; @@ -803,6 +803,8 @@ void Client::send_request(MetaRequest *request, int mds) } request->request = 0; + r->set_mdsmap_epoch(mdsmap->get_epoch()); + dout(10) << "send_request " << *r << " to mds" << mds << dendl; messenger->send_message(r, mdsmap->get_inst(mds)); @@ -1109,11 +1111,22 @@ void Client::handle_file_caps(MClientFileCaps *m) mds_sessions[mds]++; // reap? - if (m->get_op() == MClientFileCaps::OP_REAP) { + if (m->get_op() == MClientFileCaps::OP_IMPORT) { int other = m->get_mds(); + /* + * FIXME: there is a race here.. if the caps are exported twice in succession, + * you may get the second import before the first, in which case the middle MDS's + * import and then export won't be handled properly. + * there should be a sequence number attached to the cap, incremented each time + * it is exported... + */ + /* + * FIXME: handle mds failures + */ + if (in && in->stale_caps.count(other)) { - dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " reap on mds" << other << dendl; + dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " imported from mds" << other << dendl; // fresh from new mds? if (!in->caps.count(mds)) { @@ -1128,7 +1141,7 @@ void Client::handle_file_caps(MClientFileCaps *m) // fall-thru! } else { - dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " premature (!!) reap on mds" << other << dendl; + dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " premature (!!) import from mds" << other << dendl; // delay! cap_reap_queue[in->ino()][other] = m; return; @@ -1138,8 +1151,8 @@ void Client::handle_file_caps(MClientFileCaps *m) assert(in); // stale? - if (m->get_op() == MClientFileCaps::OP_STALE) { - dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() << " from mds" << mds << " now stale" << dendl; + if (m->get_op() == MClientFileCaps::OP_EXPORT) { + dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() << " from mds" << mds << " now exported/stale" << dendl; // move to stale list assert(in->caps.count(mds)); @@ -1201,6 +1214,7 @@ void Client::handle_file_caps(MClientFileCaps *m) << " seq " << m->get_seq() << " " << cap_string(m->get_caps()) << ", which we don't want caps for, releasing." << dendl; + m->set_op(MClientFileCaps::OP_ACK); m->set_caps(0); m->set_wanted(0); messenger->send_message(m, m->get_source_inst()); @@ -1593,7 +1607,7 @@ int Client::_link(const char *existing, const char *newname) MClientRequest *req = new MClientRequest(MDS_OP_LINK, messenger->get_myinst()); req->set_path(newname); - req->set_sarg(existing); + req->set_path2(existing); // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -1672,7 +1686,7 @@ int Client::_rename(const char *from, const char *to) { MClientRequest *req = new MClientRequest(MDS_OP_RENAME, messenger->get_myinst()); req->set_path(from); - req->set_sarg(to); + req->set_path2(to); // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -1800,7 +1814,7 @@ int Client::_symlink(const char *target, const char *link) { MClientRequest *req = new MClientRequest(MDS_OP_SYMLINK, messenger->get_myinst()); req->set_path(link); - req->set_sarg(target); + req->set_path2(target); // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -1887,7 +1901,7 @@ int Client::_do_lstat(const char *path, int mask, Inode **in) req = new MClientRequest(MDS_OP_LSTAT, messenger->get_myinst()); req->head.args.stat.mask = mask; - req->set_path(fpath); + req->set_filepath(fpath); MClientReply *reply = make_request(req); res = reply->get_result(); @@ -2307,7 +2321,7 @@ int Client::_readdir_get_frag(DirResult *dirp) diri = inode_map[ino]; dout(10) << "_readdir_get_frag got diri " << diri << " " << diri->inode.ino << dendl; assert(diri); - assert(diri->inode.mode & INODE_MODE_DIR); + assert(diri->inode.is_dir()); } if (!dirp->inode && diri) { diff --git a/trunk/ceph/client/Client.h b/trunk/ceph/client/Client.h index 850f9d2b67909..64c09afc68b55 100644 --- a/trunk/ceph/client/Client.h +++ b/trunk/ceph/client/Client.h @@ -211,9 +211,7 @@ class Inode { inodeno_t ino() { return inode.ino; } - bool is_dir() { - return (inode.mode & INODE_TYPE_MASK) == INODE_MODE_DIR; - } + bool is_dir() { return inode.is_dir(); } int file_caps() { int c = 0; diff --git a/trunk/ceph/client/SyntheticClient.cc b/trunk/ceph/client/SyntheticClient.cc index c44ab65a995a1..78b00d4b581d1 100644 --- a/trunk/ceph/client/SyntheticClient.cc +++ b/trunk/ceph/client/SyntheticClient.cc @@ -118,6 +118,11 @@ void parse_syn_options(vector& args) syn_iargs.push_back( atoi(args[++i]) ); syn_iargs.push_back( atoi(args[++i]) ); syn_iargs.push_back( atoi(args[++i]) ); + } else if (strcmp(args[i],"makefiles2") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_MAKEFILES2 ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); } else if (strcmp(args[i],"linktest") == 0) { syn_modes.push_back( SYNCLIENT_MODE_LINKTEST ); } else if (strcmp(args[i],"createshared") == 0) { @@ -259,7 +264,7 @@ string SyntheticClient::get_sarg(int seq) } if (a.length() == 0 || a == "~") { char s[20]; - sprintf(s,"syn.%d.%d", client->whoami, seq); + sprintf(s,"/syn.%d.%d", client->whoami, seq); a = s; } return a; @@ -1360,7 +1365,7 @@ int SyntheticClient::clean_dir(string& basedir) continue; } - if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) { + if ((st.st_mode & S_IFMT) == S_IFDIR) { clean_dir(file); client->rmdir(file.c_str()); } else { @@ -1429,7 +1434,7 @@ int SyntheticClient::full_walk(string& basedir) file.c_str()); - if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) { + if ((st.st_mode & S_IFMT) == S_IFDIR) { dirq.push_back(file); } } @@ -1645,9 +1650,15 @@ int SyntheticClient::open_shared(int num, int count) for (int n=0; nopen(d,O_RDONLY); - fds.push_back(fd); + if (fd > 0) fds.push_back(fd); } + if (false && client->get_nodeid() == 0) + for (int n=0; nunlink(d); + } + while (!fds.empty()) { int fd = fds.front(); fds.pop_front(); @@ -2814,10 +2825,10 @@ void SyntheticClient::import_find(const char *base, const char *find, bool data) assert(modestring.length() == 10); mode_t mode = 0; switch (modestring[0]) { - case 'd': mode |= INODE_MODE_DIR; break; - case 'l': mode |= INODE_MODE_SYMLINK; break; + case 'd': mode |= S_IFDIR; break; + case 'l': mode |= S_IFLNK; break; default: - case '-': mode |= INODE_MODE_FILE; break; + case '-': mode |= S_IFREG; break; } if (modestring[1] == 'r') mode |= 0400; if (modestring[2] == 'w') mode |= 0200; diff --git a/trunk/ceph/include/ceph_fs.h b/trunk/ceph/include/ceph_fs.h index e98f44950901f..c81356e51e6d6 100644 --- a/trunk/ceph/include/ceph_fs.h +++ b/trunk/ceph/include/ceph_fs.h @@ -264,6 +264,7 @@ enum { struct ceph_client_request_head { struct ceph_entity_inst client_inst; __u64 tid, oldest_client_tid; + __u64 mdsmap_epoch; /* on client */ __u32 num_fwd; __u32 retry_attempt; ceph_ino_t mds_wants_replica_in_dirino; @@ -324,6 +325,7 @@ struct ceph_client_reply_head { __s32 result; __u32 file_caps; __u64 file_caps_seq; + __u64 mdsmap_epoch; }; struct ceph_client_reply_inode { diff --git a/trunk/ceph/include/filepath.h b/trunk/ceph/include/filepath.h index 4425e1d7c5b3a..c92663049bf1f 100644 --- a/trunk/ceph/include/filepath.h +++ b/trunk/ceph/include/filepath.h @@ -30,13 +30,13 @@ using namespace std; #include "buffer.h" - +#include "encodable.h" class filepath { /** path - * can be relative "a/b/c" or absolute "/a/b/c". */ - string path; + inodeno_t ino; // base inode + string path; // relative path /** bits - path segemtns * this is ['a', 'b', 'c'] for both the aboslute and relative case. @@ -74,40 +74,29 @@ class filepath { } public: - filepath() {} - filepath(const string& s) { - set_path(s); - } - filepath(const char* s) { - set_path(s); - } + filepath() : ino(0) {} + filepath(const string& s, inodeno_t i=1) : ino(i), path(s) { } + filepath(const char* s, inodeno_t i=1) : ino(i), path(s) { } filepath(const filepath& o) { - set_path(o.get_path()); + ino = o.ino; + path = o.path; + bits = o.bits; } - // accessors - const string& get_path() const { - return path; - } - const char *c_str() const { - return path.c_str(); - } + inodeno_t get_ino() const { return ino; } + const string& get_path() const { return path; } + const char *c_str() const { return path.c_str(); } - int length() const { - return path.length(); - } + int length() const { return path.length(); } unsigned depth() const { if (bits.empty() && path.length() > 0) parse_bits(); return bits.size(); } - bool empty() const { - return path.length() == 0; - } + bool empty() const { return path.length() == 0; } - // FIXME: const-edness - bool absolute() { return path.length() && path[0] == '/'; } - bool relative() { return !absolute(); } + bool absolute() const { return ino > 0; } + bool relative() const { return !absolute(); } const string& operator[](int i) const { if (bits.empty() && path.length() > 0) parse_bits(); @@ -121,6 +110,7 @@ class filepath { filepath prefixpath(int s) const { filepath t; + t.ino = ino; for (int i=0; i 1) + out << '#' << hex << path.get_ino() << dec; + if (path.get_ino() > 0 && path.depth()) + out << '/'; return out << path.get_path(); } diff --git a/trunk/ceph/include/types.h b/trunk/ceph/include/types.h index c21c267ec0fdd..374efe362afe7 100644 --- a/trunk/ceph/include/types.h +++ b/trunk/ceph/include/types.h @@ -150,11 +150,6 @@ namespace __gnu_cxx { } -#define INODE_MODE_FILE 0100000 // S_IFREG -#define INODE_MODE_SYMLINK 0120000 // S_IFLNK -#define INODE_MODE_DIR 0040000 // S_IFDIR -#define INODE_TYPE_MASK 0170000 - #define FILE_MODE_R 1 #define FILE_MODE_W 2 #define FILE_MODE_RW (1|2) @@ -179,6 +174,9 @@ namespace __gnu_cxx { inline int DT_TO_MODE(int dt) { return dt << 12; } +inline unsigned char MODE_TO_DT(int mode) { + return mode >> 12; +} struct inode_t { // base (immutable) @@ -202,25 +200,18 @@ struct inode_t { int64_t size, max_size, allocated_size; utime_t mtime; // file data modify time. utime_t atime; // file data access time. + utime_t rmtime; // recursive mtime // special stuff version_t version; // auth only version_t file_data_version; // auth only // file type - bool is_symlink() { return (mode & INODE_TYPE_MASK) == INODE_MODE_SYMLINK; } - bool is_dir() { return (mode & INODE_TYPE_MASK) == INODE_MODE_DIR; } - bool is_file() { return (mode & INODE_TYPE_MASK) == INODE_MODE_FILE; } - - // corresponding d_types - static const unsigned char DT_REG = 8; - static const unsigned char DT_DIR = 4; - static const unsigned char DT_LNK = 10; + bool is_symlink() { return (mode & S_IFMT) == S_IFLNK; } + bool is_dir() { return (mode & S_IFMT) == S_IFDIR; } + bool is_file() { return (mode & S_IFMT) == S_IFREG; } }; -inline unsigned char MODE_TO_DT(int mode) { - return mode >> 12; -} diff --git a/trunk/ceph/kernel/mds_client.c b/trunk/ceph/kernel/mds_client.c index ff43bc8693b09..d73759878ce3f 100644 --- a/trunk/ceph/kernel/mds_client.c +++ b/trunk/ceph/kernel/mds_client.c @@ -233,7 +233,8 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) struct ceph_msg * ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, - ceph_ino_t baseino, const char *path, const char *path2) + ceph_ino_t ino1, const char *path1, + ceph_ino_t ino2, const char *path2) { struct ceph_msg *req; struct ceph_client_request_head *head; @@ -241,7 +242,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, req = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, sizeof(struct ceph_client_request_head) + - sizeof(baseino) + strlen(path) + 1 + strlen(path2) + 1, + sizeof(ino1)*2 + strlen(path) + strlen(path2) + 2 0, 0); if (IS_ERR(req)) return req; @@ -256,9 +257,10 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, /*FIXME: head->oldest_client_tid = cpu_to_le64(....);*/ /* encode paths */ - ceph_encode_64(&p, end, baseino); - memcpy(p, path, strlen(path)+1); - p += strlen(path)+1; + ceph_encode_64(&p, end, ino1); + memcpy(p, path1, strlen(path1)+1); + p += strlen(path1)+1; + ceph_encode_64(&p, end, ino2); memcpy(p, path2, strlen(path2)+1); p += strlen(path2)+1; BUG_ON(p != end); diff --git a/trunk/ceph/mds/CDentry.cc b/trunk/ceph/mds/CDentry.cc index 2b6bb3470e8a8..ce1fd04c14af4 100644 --- a/trunk/ceph/mds/CDentry.cc +++ b/trunk/ceph/mds/CDentry.cc @@ -41,7 +41,7 @@ ostream& CDentry::print_db_line_prefix(ostream& out) ostream& operator<<(ostream& out, CDentry& dn) { - string path; + filepath path; dn.make_path(path); out << "[dentry " << path; @@ -59,10 +59,10 @@ ostream& operator<<(ostream& out, CDentry& dn) if (dn.is_null()) out << " NULL"; if (dn.is_remote()) { out << " REMOTE("; - switch (dn.get_remote_d_type()) { - case inode_t::DT_REG: out << "reg"; break; - case inode_t::DT_DIR: out << "dir"; break; - case inode_t::DT_LNK: out << "lnk"; break; + switch (dn.get_remote_d_type() << 12) { + case S_IFREG: out << "reg"; break; + case S_IFDIR: out << "dir"; break; + case S_IFLNK: out << "lnk"; break; default: assert(0); } out << ")"; @@ -186,10 +186,10 @@ void CDentry::mark_new() state_set(STATE_NEW); } -void CDentry::make_path(string& s) +void CDentry::make_path_string(string& s) { if (dir) { - dir->inode->make_path(s); + dir->inode->make_path_string(s); } else { s = "???"; } @@ -197,6 +197,19 @@ void CDentry::make_path(string& s) s += name; } +void CDentry::make_path(filepath& fp) +{ + assert(dir); + if (dir->inode->is_base()) + fp.set_ino(dir->inode->ino()); // base case + else if (dir->inode->get_parent_dn()) + dir->inode->get_parent_dn()->make_path(fp); // recurse + else + fp.set_ino(dir->inode->ino()); // relative but not base? hrm! + fp.push_dentry(name); +} + +/* void CDentry::make_path(string& s, inodeno_t tobase) { assert(dir); @@ -211,6 +224,7 @@ void CDentry::make_path(string& s, inodeno_t tobase) } s += name; } +*/ /** make_anchor_trace * construct an anchor trace for this dentry, as if it were linked to *in. @@ -302,6 +316,11 @@ void CDentry::adjust_nested_auth_pins(int by) dir->adjust_nested_auth_pins(by); } +bool CDentry::is_frozen() +{ + return dir->is_frozen(); +} + // ---------------------------- // locking diff --git a/trunk/ceph/mds/CDentry.h b/trunk/ceph/mds/CDentry.h index 416792beb8778..b99ad9ea603d5 100644 --- a/trunk/ceph/mds/CDentry.h +++ b/trunk/ceph/mds/CDentry.h @@ -26,6 +26,7 @@ using namespace std; #include "include/buffer.h" #include "include/lru.h" #include "include/xlist.h" +#include "include/filepath.h" #include "mdstypes.h" #include "SimpleLock.h" @@ -166,6 +167,7 @@ public: void auth_pin(); void auth_unpin(); void adjust_nested_auth_pins(int by); + bool is_frozen(); // dentry type is primary || remote || null @@ -184,8 +186,8 @@ public: const CDentry& operator= (const CDentry& right); // misc - void make_path(string& p); - void make_path(string& p, inodeno_t tobase); + void make_path_string(string& s); + void make_path(filepath& fp); void make_anchor_trace(vector& trace, CInode *in); // -- version -- diff --git a/trunk/ceph/mds/CDir.cc b/trunk/ceph/mds/CDir.cc index b4663b269c659..adaf5fa6c0d4f 100644 --- a/trunk/ceph/mds/CDir.cc +++ b/trunk/ceph/mds/CDir.cc @@ -45,7 +45,7 @@ ostream& operator<<(ostream& out, CDir& dir) { - string path; + filepath path; dir.get_inode()->make_path(path); out << "[dir " << dir.dirfrag() << " " << path << "/"; if (dir.is_auth()) { diff --git a/trunk/ceph/mds/CInode.cc b/trunk/ceph/mds/CInode.cc index 3bdfc89e3f1fa..4320750c50eb8 100644 --- a/trunk/ceph/mds/CInode.cc +++ b/trunk/ceph/mds/CInode.cc @@ -46,7 +46,7 @@ ostream& CInode::print_db_line_prefix(ostream& out) ostream& operator<<(ostream& out, CInode& in) { - string path; + filepath path; in.make_path(path); out << "[inode " << in.inode.ino << " " << path << (in.is_dir() ? "/ ":" "); if (in.is_auth()) { @@ -351,10 +351,10 @@ CInode *CInode::get_parent_inode() -void CInode::make_path(string& s) +void CInode::make_path_string(string& s) { if (parent) { - parent->make_path(s); + parent->make_path_string(s); } else if (is_root()) { s = ""; // root @@ -370,6 +370,14 @@ void CInode::make_path(string& s) } } +void CInode::make_path(filepath& fp) +{ + if (parent) + parent->make_path(fp); + else + fp.set_ino(ino()); +} + void CInode::make_anchor_trace(vector& trace) { if (parent) { @@ -486,8 +494,10 @@ void CInode::encode_lock_state(int type, bufferlist& bl) list dfls; get_dirfrags(dfls); for (list::iterator p = dfls.begin(); p != dfls.end(); ++p) - if ((*p)->is_auth()) - myfrags.insert((*p)->get_frag()); + if ((*p)->is_auth()) { + frag_t fg = (*p)->get_frag(); + myfrags.insert(fg); + } _encode(myfrags, bl); } break; @@ -501,13 +511,15 @@ void CInode::encode_lock_state(int type, bufferlist& bl) case LOCK_OTYPE_IDIR: _encode(inode.mtime, bl); if (0) { - map dfsz; + map frag_sizes; for (map::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) - if (p->second->is_auth()) - dfsz[p->first] = p->second->get_nitems(); - _encode(dfsz, bl); + if (p->second->is_auth()) { + //frag_t fg = (*p)->get_frag(); + //frag_sizes[f] = dirfrag_size[fg]; + } + _encode(frag_sizes, bl); } break; @@ -738,8 +750,8 @@ void CInode::adjust_nested_auth_pins(int a) pair CInode::authority() { - if (force_auth.first >= 0) - return force_auth; + if (inode_auth.first >= 0) + return inode_auth; if (parent) return parent->dir->authority(); @@ -779,10 +791,6 @@ void CInode::encode_export(bufferlist& bl) ::_encode_simple(replica_map, bl); - map cap_map; - export_client_caps(cap_map); - ::_encode_simple(cap_map, bl); - authlock._encode(bl); linklock._encode(bl); dirfragtreelock._encode(bl); @@ -803,7 +811,6 @@ void CInode::finish_export(utime_t now) } void CInode::decode_import(bufferlist::iterator& p, - set& new_client_caps, LogSegment *ls) { utime_t old_mtime = inode.mtime; @@ -826,13 +833,12 @@ void CInode::decode_import(bufferlist::iterator& p, ::_decode_simple(replica_map, p); if (!replica_map.empty()) get(PIN_REPLICATED); - map cap_map; - ::_decode_simple(cap_map, p); - merge_client_caps(cap_map, new_client_caps); - authlock._decode(p); linklock._decode(p); dirfragtreelock._decode(p); filelock._decode(p); dirlock._decode(p); } + + + diff --git a/trunk/ceph/mds/CInode.h b/trunk/ceph/mds/CInode.h index 8f453472a0477..b6da550a8d7b4 100644 --- a/trunk/ceph/mds/CInode.h +++ b/trunk/ceph/mds/CInode.h @@ -68,6 +68,7 @@ class CInode : public MDSCacheObject { static const int PIN_PURGING = -12; static const int PIN_FREEZING = 13; static const int PIN_FROZEN = 14; + static const int PIN_IMPORTINGCAPS = 15; const char *pin_name(int p) { switch (p) { @@ -81,8 +82,10 @@ class CInode : public MDSCacheObject { case PIN_BATCHOPENJOURNAL: return "batchopenjournal"; case PIN_SCATTERED: return "scattered"; case PIN_STICKYDIRS: return "stickydirs"; + case PIN_PURGING: return "purging"; case PIN_FREEZING: return "freezing"; case PIN_FROZEN: return "frozen"; + case PIN_IMPORTINGCAPS: return "importingcaps"; default: return generic_pin_name(p); } } @@ -96,6 +99,7 @@ class CInode : public MDSCacheObject { static const int STATE_FREEZING = (1<<7); static const int STATE_FROZEN = (1<<8); static const int STATE_AMBIGUOUSAUTH = (1<<9); + static const int STATE_EXPORTINGCAPS = (1<<10); // -- waiters -- //static const int WAIT_SLAVEAGREE = (1<<0); @@ -126,7 +130,7 @@ class CInode : public MDSCacheObject { inode_t inode; // the inode itself string symlink; // symlink dest, if symlink fragtree_t dirfragtree; // dir frag tree, if any. always consistent with our dirfrag map. - //map dirfrag_size; // size of each dirfrag + map dirfrag_size; // size of each dirfrag off_t last_journaled; // log offset for the last time i was journaled off_t last_open_journaled; // log offset for the last journaled EOpen @@ -182,7 +186,7 @@ public: CDentry *parent; // primary link set remote_parents; // if hard linked - pair force_auth; + pair inode_auth; // -- distributed state -- protected: @@ -226,7 +230,7 @@ public: last_journaled(0), last_open_journaled(0), //hack_accessed(true), stickydir_ref(0), - parent(0), force_auth(CDIR_AUTH_DEFAULT), + parent(0), inode_auth(CDIR_AUTH_DEFAULT), replica_caps_wanted(0), xlist_dirty(this), xlist_open_file(this), xlist_dirty_inode_mtime(this), xlist_purging_inode(this), @@ -277,7 +281,8 @@ public: } // -- misc -- - void make_path(string& s); + void make_path_string(string& s); + void make_path(filepath& s); void make_anchor_trace(vector& trace); void name_stray_dentry(string& dname); @@ -305,10 +310,8 @@ public: void abort_export() { put(PIN_TEMPEXPORTING); } - void decode_import(bufferlist::iterator& p, - set& new_client_caps, - LogSegment *ls); - + void decode_import(bufferlist::iterator& p, LogSegment *ls); + // -- locks -- public: diff --git a/trunk/ceph/mds/ClientMap.h b/trunk/ceph/mds/ClientMap.h index c36e66d240a33..59b3dde49b6be 100644 --- a/trunk/ceph/mds/ClientMap.h +++ b/trunk/ceph/mds/ClientMap.h @@ -53,6 +53,7 @@ public: version_t get_committing() { return committing; } version_t get_committed() { return committed; } + void set_version(version_t v) { version = v; } version_t inc_projected() { return ++projected; } void reset_projected() { projected = version; } void set_committing(version_t v) { committing = v; } @@ -82,6 +83,7 @@ public: void add_opening(int c) { opening.insert(c); } bool is_closing(int c) { return closing.count(c); } void add_closing(int c) { closing.insert(c); } + void remove_closing(int c) { closing.erase(c); } bool have_session(int client) { return client_inst.count(client); } @@ -97,6 +99,16 @@ public: client_inst.erase(client); version++; } + void noop() { + version++; + } + void open_sessions(map& cm) { + for (map::iterator p = cm.begin(); p != cm.end(); ++p) { + client_inst[p->first] = p->second; + sessions.insert(p->first); + } + version++; + } private: // -- push sequence -- @@ -113,19 +125,18 @@ public: private: // -- completed requests -- - // client id -> tid -> result code - map > completed_requests; // completed client requests - map > waiting_for_trim; + // who -> { tid set ... } + map > completed_requests; + map > waiting_for_trim; version_t requestmapv; public: void add_completed_request(metareqid_t ri) { - completed_requests[ri.client].insert(ri.tid); + completed_requests[ri.name].insert(ri.tid); requestmapv++; } - void trim_completed_requests(int client, - tid_t mintid) { // zero means trim all! - map >::iterator p = completed_requests.find(client); + void trim_completed_requests(entity_name_t who, tid_t mintid) { // zero means trim all! + map >::iterator p = completed_requests.find(who); if (p == completed_requests.end()) return; @@ -136,7 +147,7 @@ public: completed_requests.erase(p); // kick waiters - map >::iterator q = waiting_for_trim.find(client); + map >::iterator q = waiting_for_trim.find(who); if (q != waiting_for_trim.end()) { list fls; while (!q->second.empty() && @@ -150,11 +161,11 @@ public: } } void add_trim_waiter(metareqid_t ri, Context *c) { - waiting_for_trim[ri.client][ri.tid] = c; + waiting_for_trim[ri.name][ri.tid] = c; } bool have_completed_request(metareqid_t ri) { - return completed_requests.count(ri.client) && - completed_requests[ri.client].count(ri.tid); + return completed_requests.count(ri.name) && + completed_requests[ri.name].count(ri.tid); } diff --git a/trunk/ceph/mds/Locker.cc b/trunk/ceph/mds/Locker.cc index db4f1ca585e3f..ce027a694686c 100644 --- a/trunk/ceph/mds/Locker.cc +++ b/trunk/ceph/mds/Locker.cc @@ -237,6 +237,7 @@ bool Locker::acquire_locks(MDRequest *mdr, MDSCacheObjectInfo info; (*q)->set_object_info(info); req->get_authpins().push_back(info); + mdr->pin(*q); } mds->send_message_mds(req, p->first); @@ -452,7 +453,7 @@ Capability* Locker::issue_new_caps(CInode *in, dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl; // my needs - int my_client = req->get_client(); + int my_client = req->get_client().num(); int my_want = 0; if (mode & FILE_MODE_R) my_want |= CAP_FILE_RDCACHE | CAP_FILE_RD; if (mode & FILE_MODE_W) my_want |= CAP_FILE_WRBUFFER | CAP_FILE_WR; @@ -546,7 +547,7 @@ bool Locker::issue_caps(CInode *in) if (seq > 0 && !it->second.is_suppress()) { dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << dendl; - mds->send_message_client_maybe_opening(new MClientFileCaps(MClientFileCaps::OP_GRANT, + mds->send_message_client(new MClientFileCaps(MClientFileCaps::OP_GRANT, in->inode, it->second.get_last_seq(), it->second.pending(), @@ -729,7 +730,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m) MClientFileCaps *r = new MClientFileCaps(MClientFileCaps::OP_RELEASE, in->inode, 0, 0, 0); - mds->send_message_client_maybe_open(r, m->get_source_inst()); + mds->send_message_client(r, m->get_source_inst()); } // merge in atime? @@ -1028,6 +1029,8 @@ void Locker::simple_eval(SimpleLock *lock) assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); + if (lock->get_parent()->is_frozen()) return; + // stable -> sync? if (!lock->is_xlocked() && lock->get_state() != LOCK_SYNC && @@ -1562,6 +1565,14 @@ void Locker::scatter_writebehind(ScatterLock *lock) CInode *in = (CInode*)lock->get_parent(); dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl; + // hack: + if (in->is_base()) { + dout(10) << "scatter_writebehind just clearing updated flag for base inode " << *in << dendl; + lock->clear_updated(); + scatter_eval_gather(lock); + return; + } + // journal write-behind. inode_t *pi = in->project_inode(); pi->mtime = in->inode.mtime; // make sure an intermediate version isn't goofing us up @@ -1591,6 +1602,8 @@ void Locker::scatter_eval(ScatterLock *lock) assert(lock->get_parent()->is_auth()); assert(lock->is_stable()); + if (lock->get_parent()->is_frozen()) return; + CInode *in = (CInode*)lock->get_parent(); if (in->has_subtree_root_dirfrag() && !in->is_base()) { // i _should_ be scattered. @@ -2405,7 +2418,7 @@ void Locker::file_eval(FileLock *lock) assert(lock->is_stable()); // not xlocked! - if (lock->is_xlocked()) return; + if (lock->is_xlocked() || lock->get_parent()->is_frozen()) return; // * -> loner? if (!lock->is_rdlocked() && diff --git a/trunk/ceph/mds/LogEvent.cc b/trunk/ceph/mds/LogEvent.cc index 05b4336c52f05..65b0bb2ec1322 100644 --- a/trunk/ceph/mds/LogEvent.cc +++ b/trunk/ceph/mds/LogEvent.cc @@ -19,13 +19,15 @@ // events i know of #include "events/EString.h" -#include "events/ESession.h" #include "events/ESubtreeMap.h" #include "events/EExport.h" #include "events/EImportStart.h" #include "events/EImportFinish.h" #include "events/EFragment.h" +#include "events/ESession.h" +#include "events/ESessions.h" + #include "events/EUpdate.h" #include "events/ESlaveUpdate.h" #include "events/EOpen.h" @@ -55,13 +57,15 @@ LogEvent *LogEvent::decode(bufferlist& bl) switch (type) { case EVENT_STRING: le = new EString; break; - case EVENT_SESSION: le = new ESession; break; case EVENT_SUBTREEMAP: le = new ESubtreeMap; break; case EVENT_EXPORT: le = new EExport; break; case EVENT_IMPORTSTART: le = new EImportStart; break; case EVENT_IMPORTFINISH: le = new EImportFinish; break; case EVENT_FRAGMENT: le = new EFragment; break; + case EVENT_SESSION: le = new ESession; break; + case EVENT_SESSIONS: le = new ESessions; break; + case EVENT_UPDATE: le = new EUpdate; break; case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break; case EVENT_OPEN: le = new EOpen; break; diff --git a/trunk/ceph/mds/LogEvent.h b/trunk/ceph/mds/LogEvent.h index 8f2f55f342bb3..8d36a1d515c1c 100644 --- a/trunk/ceph/mds/LogEvent.h +++ b/trunk/ceph/mds/LogEvent.h @@ -17,18 +17,20 @@ #define EVENT_STRING 1 -#define EVENT_SESSION 7 #define EVENT_SUBTREEMAP 2 -#define EVENT_EXPORT 30 -#define EVENT_IMPORTSTART 31 -#define EVENT_IMPORTFINISH 32 -#define EVENT_FRAGMENT 33 +#define EVENT_EXPORT 3 +#define EVENT_IMPORTSTART 4 +#define EVENT_IMPORTFINISH 5 +#define EVENT_FRAGMENT 6 -#define EVENT_UPDATE 3 -#define EVENT_SLAVEUPDATE 4 -#define EVENT_OPEN 5 +#define EVENT_SESSION 10 +#define EVENT_SESSIONS 11 -#define EVENT_PURGEFINISH 22 +#define EVENT_UPDATE 20 +#define EVENT_SLAVEUPDATE 21 +#define EVENT_OPEN 22 + +#define EVENT_PURGEFINISH 30 #define EVENT_ANCHOR 40 #define EVENT_ANCHORCLIENT 41 diff --git a/trunk/ceph/mds/LogSegment.h b/trunk/ceph/mds/LogSegment.h index e73f5f8b61b9c..c4cf1d50897ff 100644 --- a/trunk/ceph/mds/LogSegment.h +++ b/trunk/ceph/mds/LogSegment.h @@ -30,7 +30,7 @@ class MDSlaveUpdate; class LogSegment { public: - off_t offset; + off_t offset, end; int num_events; // dirty items @@ -61,7 +61,7 @@ class LogSegment { C_Gather *try_to_expire(MDS *mds); // cons - LogSegment(off_t off) : offset(off), num_events(0), + LogSegment(off_t off) : offset(off), end(off), num_events(0), allocv(0), clientmapv(0), anchortablev(0) { } }; diff --git a/trunk/ceph/mds/MDBalancer.cc b/trunk/ceph/mds/MDBalancer.cc index 7bf6ea4f7eb80..99730db5d6d14 100644 --- a/trunk/ceph/mds/MDBalancer.cc +++ b/trunk/ceph/mds/MDBalancer.cc @@ -1000,7 +1000,7 @@ void MDBalancer::dump_pop_map() // filename last string p; - in->make_path(p); + in->make_path_string(p); myfile << "." << p; if (dir->get_frag() != frag_t()) myfile << "___" << (unsigned)dir->get_frag(); diff --git a/trunk/ceph/mds/MDCache.cc b/trunk/ceph/mds/MDCache.cc index 3417302cc9eb4..48ea6e8fec0cc 100644 --- a/trunk/ceph/mds/MDCache.cc +++ b/trunk/ceph/mds/MDCache.cc @@ -211,7 +211,7 @@ CInode *MDCache::create_root_inode() root->inode.ino = MDS_INO_ROOT; // make it up (FIXME) - root->inode.mode = 0755 | INODE_MODE_DIR; + root->inode.mode = 0755 | S_IFDIR; root->inode.size = 0; root->inode.ctime = root->inode.mtime = g_clock.now(); @@ -219,7 +219,7 @@ CInode *MDCache::create_root_inode() root->inode.nlink = 1; root->inode.layout = g_OSD_MDDirLayout; - root->force_auth = pair(0, CDIR_AUTH_UNKNOWN); + root->inode_auth = pair(0, CDIR_AUTH_UNKNOWN); add_inode( root ); @@ -262,7 +262,7 @@ CInode *MDCache::create_stray_inode(int whose) in->inode.ino = MDS_INO_STRAY(whose); // make it up (FIXME) - in->inode.mode = 0755 | INODE_MODE_DIR; + in->inode.mode = 0755 | S_IFDIR; in->inode.size = 0; in->inode.ctime = in->inode.mtime = g_clock.now(); @@ -2224,7 +2224,7 @@ void MDCache::handle_cache_rejoin_strong(MMDSCacheRejoin *strong) if (!in) in = rejoin_invent_inode(p->first.ino); if (!in->is_dir()) { assert(in->state_test(CInode::STATE_REJOINUNDEF)); - in->inode.mode = INODE_MODE_DIR; + in->inode.mode = S_IFDIR; } dir = in->get_or_open_dirfrag(this, p->first.frag); } else { @@ -2382,7 +2382,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) p != ack->strong_dirfrags.end(); ++p) { CDir *dir = get_dirfrag(p->first); - if (!dir) continue; + if (!dir) continue; // must have trimmed? dir->set_replica_nonce(p->second.nonce); dir->state_clear(CDir::STATE_REJOINING); @@ -2393,7 +2393,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) q != ack->strong_dentries[p->first].end(); ++q) { CDentry *dn = dir->lookup(q->first); - if (!dn) continue; + if (!dn) continue; // must have trimmed? // hmm, did we have the proper linkage here? if (dn->is_null() && @@ -2414,7 +2414,13 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) else if (!dn->is_null() && q->second.is_null()) { dout(-10) << " had bad linkage for " << *dn << dendl; - assert(0); // hrmpf. unlink should use slave requests to clean this up during resolve. + /* + * this should happen: + * if we're a survivor, any unlink should commit or rollback during + * the resolve stage. + * if we failed, we shouldn't have non-auth leaf dentries at all + */ + assert(0); // uh oh. } dn->set_replica_nonce(q->second.nonce); mds->locker->rejoin_set_state(&dn->lock, q->second.lock, waiters); @@ -2651,7 +2657,7 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t& // send REAP // FIXME client session weirdness. - MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_REAP, + MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_IMPORT, in->inode, in->client_caps[client].get_last_seq(), in->client_caps[client].pending(), @@ -2811,8 +2817,8 @@ public: }; /* purge_inode in - * will be called by on unlink or rmdir or truncate - * caller responsible for journaling an appropriate EUpdate + * will be called by on unlink or rmdir or truncate or purge + * caller responsible for journaling a matching EUpdate */ void MDCache::purge_inode(CInode *in, off_t newsize, off_t oldsize, LogSegment *ls) { @@ -2886,6 +2892,10 @@ void MDCache::purge_inode_finish_2(CInode *in, off_t newsize, off_t oldsize) waiting_for_purge.erase(in); finish_contexts(ls, 0); } + + // done with inode? + if (in->get_num_ref() == 0) + remove_inode(in); } void MDCache::add_recovered_purge(CInode *in, off_t newsize, off_t oldsize, LogSegment *ls) @@ -2926,6 +2936,11 @@ void MDCache::start_recovered_purges() // cache trimming +/* + * note: only called while MDS is active or stopping... NOT during recovery. + * however, we may expire a replica whose authority is recovering. + * + */ bool MDCache::trim(int max) { // trim LRU @@ -3311,7 +3326,8 @@ void MDCache::handle_cache_expire(MCacheExpire *m) int nonce = it->second; if (!in) { - dout(0) << " inode expire on " << it->first << " from " << from << ", don't have it" << dendl; + dout(0) << " inode expire on " << it->first << " from " << from + << ", don't have it" << dendl; assert(in); } assert(in->is_auth()); @@ -3319,13 +3335,15 @@ void MDCache::handle_cache_expire(MCacheExpire *m) // check nonce if (nonce == in->get_replica_nonce(from)) { // remove from our cached_by - dout(7) << " inode expire on " << *in << " from mds" << from << " cached_by was " << in->get_replicas() << dendl; + dout(7) << " inode expire on " << *in << " from mds" << from + << " cached_by was " << in->get_replicas() << dendl; inode_remove_replica(in, from); } else { // this is an old nonce, ignore expire. dout(7) << " inode expire on " << *in << " from mds" << from - << " with old nonce " << nonce << " (current " << in->get_replica_nonce(from) << "), dropping" + << " with old nonce " << nonce + << " (current " << in->get_replica_nonce(from) << "), dropping" << dendl; assert(in->get_replica_nonce(from) > nonce); } @@ -3339,7 +3357,8 @@ void MDCache::handle_cache_expire(MCacheExpire *m) int nonce = it->second; if (!dir) { - dout(0) << " dir expire on " << it->first << " from " << from << ", don't have it" << dendl; + dout(0) << " dir expire on " << it->first << " from " << from + << ", don't have it" << dendl; assert(dir); } assert(dir->is_auth()); @@ -3370,7 +3389,8 @@ void MDCache::handle_cache_expire(MCacheExpire *m) CDir *dir = diri->get_dirfrag(pd->first.frag); if (!dir) { - dout(0) << " dn expires on " << pd->first << " from " << from << ", must have refragmented" << dendl; + dout(0) << " dn expires on " << pd->first << " from " << from + << ", must have refragmented" << dendl; } else { assert(dir->is_auth()); } @@ -3537,6 +3557,12 @@ bool MDCache::shutdown_pass() return false; } + if (!shutdown_export_strays()) { + dout(7) << "waiting for strays to migrate" << dendl; + return false; + } + + // trim cache trim(0); dout(5) << "lru size now " << lru.lru_get_size() << dendl; @@ -3568,6 +3594,10 @@ bool MDCache::shutdown_pass() } } + if (!shutdown_export_caps()) { + dout(7) << "waiting for residual caps to export" << dendl; + return false; + } // subtrees map not empty yet? if (!subtrees.empty()) { @@ -3631,9 +3661,85 @@ bool MDCache::shutdown_pass() return true; } +bool MDCache::shutdown_export_strays() +{ + if (mds->get_nodeid() == 0) return true; + if (!stray) return true; + + bool done = true; + static set exported_strays; + list dfs; + stray->get_dirfrags(dfs); + while (!dfs.empty()) { + CDir *dir = dfs.front(); + dfs.pop_front(); + if (!dir->is_complete()) { + dir->fetch(0); + done = false; + } + + for (CDir::map_t::iterator p = dir->items.begin(); + p != dir->items.end(); + p++) { + CDentry *dn = p->second; + if (dn->is_null()) continue; + done = false; + + // FIXME: we'll deadlock if a rename fails. + if (exported_strays.count(dn->get_inode()->ino()) == 0) { + exported_strays.insert(dn->get_inode()->ino()); + migrate_stray(dn, mds->get_nodeid(), 0); // send to root! + } + } + } + + return done; +} + +bool MDCache::shutdown_export_caps() +{ + // export caps? + // note: this runs more often than it should. + static bool exported_caps = false; + static set exported_caps_in; + if (!exported_caps) { + dout(7) << "searching for caps to export" << dendl; + exported_caps = true; + + list dirq; + for (map >::iterator p = subtrees.begin(); + p != subtrees.end(); + ++p) { + if (exported_caps_in.count(p->first)) continue; + if (p->first->is_auth() || + p->first->is_ambiguous_auth()) + exported_caps = false; // we'll have to try again + else { + dirq.push_back(p->first); + exported_caps_in.insert(p->first); + } + } + while (!dirq.empty()) { + CDir *dir = dirq.front(); + dirq.pop_front(); + for (CDir::map_t::iterator p = dir->items.begin(); + p != dir->items.end(); + ++p) { + CDentry *dn = p->second; + if (!dn->is_primary()) continue; + CInode *in = dn->get_inode(); + if (in->is_dir()) + in->get_nested_dirfrags(dirq); + if (in->is_any_caps() && !in->state_test(CInode::STATE_EXPORTINGCAPS)) + migrator->export_caps(in); + } + } + } + return true; +} @@ -3728,27 +3834,31 @@ Context *MDCache::_get_waiter(MDRequest *mdr, Message *req) } int MDCache::path_traverse(MDRequest *mdr, Message *req, // who - CInode *base, filepath& origpath, // what + filepath& origpath, // what vector& trace, // result bool follow_trailing_symlink, // how int onfail) { assert(mdr || req); - bool null_okay = onfail == MDS_TRAVERSE_DISCOVERXLOCK; - bool noperm = false; - if (onfail == MDS_TRAVERSE_DISCOVER || - onfail == MDS_TRAVERSE_DISCOVERXLOCK) - noperm = true; + bool null_okay = (onfail == MDS_TRAVERSE_DISCOVERXLOCK); + bool noperm = (onfail == MDS_TRAVERSE_DISCOVER || + onfail == MDS_TRAVERSE_DISCOVERXLOCK); // keep a list of symlinks we touch to avoid loops set< pair > symlinks_resolved; // root - CInode *cur = base; - if (!cur) cur = get_root(); + CInode *cur = get_inode(origpath.get_ino()); if (cur == NULL) { - dout(7) << "traverse: i don't have root" << dendl; - open_root(_get_waiter(mdr, req)); + dout(7) << "traverse: opening base ino " << origpath.get_ino() << dendl; + if (origpath.get_ino() == MDS_INO_ROOT) + open_root(_get_waiter(mdr, req)); + else if (MDS_INO_IS_STRAY(origpath.get_ino())) + open_foreign_stray(origpath.get_ino() - MDS_INO_STRAY_OFFSET, _get_waiter(mdr, req)); + else { + assert(0); // hrm.. broken + return -EIO; + } return 1; } @@ -3764,7 +3874,6 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, // who while (depth < path.depth()) { dout(12) << "traverse: path seg depth " << depth << " = " << path[depth] << dendl; - // ENOTDIR? if (!cur->is_dir()) { dout(7) << "traverse: " << *cur << " not a dir " << dendl; return -ENOTDIR; @@ -4653,7 +4762,7 @@ void MDCache::eval_stray(CDentry *dn) CInode *in = dn->inode; assert(in); - return; // FIXME or test me rather, there is a bug here somewhere! + if (!dn->is_auth()) return; // has to be mine // purge? if (in->inode.nlink == 0) { @@ -4665,19 +4774,33 @@ void MDCache::eval_stray(CDentry *dn) // trivial reintegrate? if (!in->remote_parents.empty()) { CDentry *rlink = *in->remote_parents.begin(); - if (rlink->is_auth() && - rlink->dir->can_auth_pin()) + if (rlink->is_auth() && rlink->dir->can_auth_pin()) reintegrate_stray(dn, rlink); - if (!rlink->is_auth() && - !in->is_ambiguous_auth()) - migrate_stray(dn, rlink->authority().first); + if (!rlink->is_auth() && dn->is_auth()) + migrate_stray(dn, mds->get_nodeid(), rlink->authority().first); } } else { // wait for next use. } } +void MDCache::eval_remote(CDentry *dn) +{ + dout(10) << "eval_remote " << *dn << dendl; + assert(dn->is_remote()); + CInode *in = dn->get_inode(); + if (!in) return; + + // refers to stray? + if (in->get_parent_dn()->get_dir()->get_inode()->is_stray()) { + if (in->is_auth()) + eval_stray(in->get_parent_dn()); + else + migrate_stray(in->get_parent_dn(), in->authority().first, mds->get_nodeid()); + } +} + class C_MDC_PurgeStray : public Context { MDCache *cache; @@ -4721,23 +4844,48 @@ void MDCache::_purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls) dn->dir->remove_dentry(dn); // purge+remove inode + in->mark_clean(); purge_inode(in, 0, in->inode.size, ls); - remove_inode(in); } -void MDCache::reintegrate_stray(CDentry *dn, CDentry *rlink) +void MDCache::reintegrate_stray(CDentry *straydn, CDentry *rdn) { - dout(10) << "reintegrate_stray " << *dn << " into " << *rlink << dendl; + dout(10) << "reintegrate_stray " << *straydn << " into " << *rdn << dendl; + // rename it to another mds. + filepath src; + straydn->make_path(src); + filepath dst; + rdn->make_path(dst); + + MClientRequest *req = new MClientRequest(MDS_OP_RENAME, mds->messenger->get_myinst()); + req->set_filepath(src); + req->set_filepath2(dst); + req->set_tid(mds->issue_tid()); + + mds->send_message_mds(req, rdn->authority().first); } -void MDCache::migrate_stray(CDentry *dn, int dest) +void MDCache::migrate_stray(CDentry *dn, int from, int to) { - dout(10) << "migrate_stray to mds" << dest << " " << *dn << dendl; + dout(10) << "migrate_stray from mds" << from << " to mds" << to + << " " << *dn << " " << *dn->inode << dendl; + + // rename it to another mds. + string dname; + dn->get_inode()->name_stray_dentry(dname); + filepath src(dname, MDS_INO_STRAY(from)); + filepath dst(dname, MDS_INO_STRAY(to)); + MClientRequest *req = new MClientRequest(MDS_OP_RENAME, mds->messenger->get_myinst()); + req->set_filepath(src); + req->set_filepath2(dst); + req->set_tid(mds->issue_tid()); + + mds->send_message_mds(req, to); } @@ -5136,9 +5284,9 @@ void MDCache::handle_discover(MDiscover *dis) // xlocked dentry? // ...always block on non-tail items (they are unrelated) // ...allow xlocked tail disocvery _only_ if explicitly requested + bool tailitem = (dis->get_want().depth() == 0) || (i == dis->get_want().depth() - 1); if (dn->lock.is_xlocked()) { // is this the last (tail) item in the discover traversal? - bool tailitem = (dis->get_want().depth() == 0) || (i == dis->get_want().depth() - 1); if (tailitem && dis->wants_xlocked()) { dout(7) << "handle_discover allowing discovery of xlocked tail " << *dn << dendl; } else { @@ -5150,9 +5298,10 @@ void MDCache::handle_discover(MDiscover *dis) } // frozen inode? - if (dn->is_primary() && - dn->inode->is_frozen()) { - if (reply->is_empty()) { + if (dn->is_primary() && dn->inode->is_frozen()) { + if (tailitem && dis->wants_xlocked()) { + dout(7) << "handle_discover allowing discovery of frozen tail " << *dn->inode << dendl; + } else if (reply->is_empty()) { dout(7) << *dn->inode << " is frozen, empty reply, waiting" << dendl; dn->inode->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, dis)); delete reply; @@ -5213,7 +5362,6 @@ void MDCache::handle_discover_reply(MDiscoverReply *m) // add base inode cur = add_replica_inode(m->get_inode(0), NULL, finished); - cur->force_auth = pair(m->get_source().num(), CDIR_AUTH_UNKNOWN); dout(7) << "discover_reply got base inode " << *cur << dendl; @@ -5457,12 +5605,20 @@ CInode *MDCache::add_replica_inode(CInodeDiscover& dis, CDentry *dn, listis_base()) { + if (in->ino() == MDS_INO_ROOT) + in->inode_auth.first = 0; + else if (MDS_INO_IS_STRAY(in->ino())) + in->inode_auth.first = in->ino() - MDS_INO_STRAY_OFFSET; + else + assert(0); + } + dout(10) << "add_replica_inode added " << *in << dendl; if (dn && dn->is_null()) dn->dir->link_primary_inode(dn, in); } else { dis.update_inode(in); - dout(10) << "add_replica_inode added " << *in << dendl; + dout(10) << "add_replica_inode had " << *in << dendl; } if (dn) { @@ -5485,7 +5641,6 @@ CDentry *MDCache::add_replica_stray(bufferlist &bl, CInode *in, int from) CInodeDiscover indis; indis._decode(bl, off); CInode *strayin = add_replica_inode(indis, NULL, finished); - strayin->force_auth = pair(from, CDIR_AUTH_UNKNOWN); dout(15) << "strayin " << *strayin << dendl; // dir @@ -5576,7 +5731,7 @@ int MDCache::send_dir_updates(CDir *dir, bool bcast) dout(7) << "sending dir_update on " << *dir << " bcast " << bcast << " to " << who << dendl; - string path; + filepath path; dir->inode->make_path(path); int whoami = mds->get_nodeid(); @@ -5616,7 +5771,7 @@ void MDCache::handle_dir_update(MDirUpdate *m) dout(5) << "trying discover on dir_update for " << path << dendl; int r = path_traverse(0, m, - 0, path, trace, true, + path, trace, true, MDS_TRAVERSE_DISCOVER); if (r > 0) return; @@ -5680,6 +5835,14 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m) dn->dir->unlink_inode(dn); assert(straydn); straydn->dir->link_primary_inode(straydn, in); + + // send caps to auth (if we're not already) + if (in->is_any_caps() && + !in->state_test(CInode::STATE_EXPORTINGCAPS)) + migrator->export_caps(in); + + lru.lru_bottouch(straydn); // move stray to end of lru + } else { assert(dn->is_remote()); dn->dir->unlink_inode(dn); diff --git a/trunk/ceph/mds/MDCache.h b/trunk/ceph/mds/MDCache.h index 86e3b894c6c8d..35c324f150686 100644 --- a/trunk/ceph/mds/MDCache.h +++ b/trunk/ceph/mds/MDCache.h @@ -132,6 +132,9 @@ struct MDRequest { version_t inode_import_v; CInode* destdn_was_remote_inode; bool was_link_merge; + + map imported_client_map; + map > cap_imports; // called when slave commits or aborts Context *slave_commit; @@ -493,6 +496,8 @@ public: void shutdown_start(); void shutdown_check(); bool shutdown_pass(); + bool shutdown_export_strays(); + bool shutdown_export_caps(); bool shutdown(); // clear cache (ie at shutodwn) bool did_shutdown_log_cap; @@ -579,8 +584,7 @@ public: CDentry *get_or_create_stray_dentry(CInode *in); Context *_get_waiter(MDRequest *mdr, Message *req); - int path_traverse(MDRequest *mdr, Message *req, - CInode *base, filepath& path, + int path_traverse(MDRequest *mdr, Message *req, filepath& path, vector& trace, bool follow_trailing_sym, int onfail); bool path_is_mine(filepath& path); @@ -619,12 +623,13 @@ protected: // -- stray -- public: void eval_stray(CDentry *dn); + void eval_remote(CDentry *dn); protected: void _purge_stray(CDentry *dn); void _purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls); friend class C_MDC_PurgeStray; void reintegrate_stray(CDentry *dn, CDentry *rlink); - void migrate_stray(CDentry *dn, int dest); + void migrate_stray(CDentry *dn, int src, int dest); // == messages == diff --git a/trunk/ceph/mds/MDLog.cc b/trunk/ceph/mds/MDLog.cc index eeea99c721751..52c50ff82a4fc 100644 --- a/trunk/ceph/mds/MDLog.cc +++ b/trunk/ceph/mds/MDLog.cc @@ -170,9 +170,11 @@ void MDLog::submit_entry( LogEvent *le, Context *c ) // journal it. journaler->append_entry(bl); // bl is destroyed. } - + + le->_segment->end = journaler->get_write_pos(); + delete le; - + if (logger) { logger->inc("evadd"); logger->set("ev", num_events); @@ -343,6 +345,9 @@ void MDLog::_expired(LogSegment *ls) if (!capped && ls == get_current_segment()) { dout(5) << "_expired not expiring " << ls->offset << ", last one and !capped" << dendl; + } else if (ls->end > journaler->get_write_ack_pos()) { + dout(5) << "_expired not expiring " << ls->offset << ", not fully flushed yet, ack " + << journaler->get_write_ack_pos() << " < end " << ls->end << dendl; } else { // expired. expired_segments.insert(ls); @@ -467,6 +472,7 @@ void MDLog::_replay_thread() << " : " << *le << dendl; le->_segment = get_current_segment(); // replay may need this le->_segment->num_events++; + le->_segment->end = journaler->get_read_pos(); num_events++; le->replay(mds); diff --git a/trunk/ceph/mds/MDLog.h b/trunk/ceph/mds/MDLog.h index f7bdcd21a5303..c958585b86a48 100644 --- a/trunk/ceph/mds/MDLog.h +++ b/trunk/ceph/mds/MDLog.h @@ -163,6 +163,9 @@ public: void submit_entry( LogEvent *e, Context *c = 0 ); void wait_for_sync( Context *c ); void flush(); + bool is_flushed() { + return unflushed == 0; + } private: class C_MaybeExpiredSegment : public Context { diff --git a/trunk/ceph/mds/MDS.cc b/trunk/ceph/mds/MDS.cc index af32250a55dc1..4e0ce75bf8751 100644 --- a/trunk/ceph/mds/MDS.cc +++ b/trunk/ceph/mds/MDS.cc @@ -76,6 +76,8 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) : this->whoami = whoami; + last_tid = 0; + monmap = mm; messenger = m; @@ -250,7 +252,8 @@ void MDS::send_message_mds(Message *m, int mds) void MDS::forward_message_mds(Message *req, int mds) { // client request? - if (req->get_type() == CEPH_MSG_CLIENT_REQUEST) { + if (req->get_type() == CEPH_MSG_CLIENT_REQUEST && + ((MClientRequest*)req)->get_client_inst().name.is_client()) { MClientRequest *creq = (MClientRequest*)req; creq->inc_num_fwd(); // inc forward counter @@ -259,8 +262,13 @@ void MDS::forward_message_mds(Message *req, int mds) creq->get_client_inst()); if (!creq->is_idempotent()) { + /* don't actually forward if non-idempotent! + * client has to do it. although the MDS will ignore duplicate requests, + * the affected metadata may migrate, in which case the new authority + * won't have the metareq_id in the completed request map. + */ delete req; - return; // don't actually forward if non-idempotent! client has to do it. + return; } } @@ -285,44 +293,6 @@ void MDS::send_message_client(Message *m, entity_inst_t clientinst) } -class C_MDS_SendMessageClientSession : public Context { - MDS *mds; - Message *msg; - entity_inst_t clientinst; -public: - C_MDS_SendMessageClientSession(MDS *md, Message *ms, entity_inst_t& ci) : - mds(md), msg(ms), clientinst(ci) {} - void finish(int r) { - mds->clientmap.open_session(clientinst); - mds->send_message_client(msg, clientinst.name.num()); - } -}; - -void MDS::send_message_client_maybe_opening(Message *m, int c) -{ - send_message_client_maybe_open(m, clientmap.get_inst(c)); -} - -void MDS::send_message_client_maybe_open(Message *m, entity_inst_t clientinst) -{ - // FIXME - // _most_ ppl shoudl check for a client session, since migration may call this, - // start opening, and then e.g. locker sends something else (through non-maybe_open - // version) - int client = clientinst.name.num(); - if (!clientmap.have_session(client)) { - // no session! - dout(10) << "send_message_client opening session with " << clientinst << dendl; - clientmap.add_opening(client); - mdlog->submit_entry(new ESession(clientinst, true, clientmap.inc_projected()), - new C_MDS_SendMessageClientSession(this, m, clientinst)); - } else { - // we have a session. - send_message_client(m, clientinst); - } -} - - int MDS::init(bool standby) { @@ -1165,7 +1135,7 @@ void MDS::my_dispatch(Message *m) // HACK FOR NOW - if (is_active()) { + if (is_active() || is_stopping()) { // flush log to disk after every op. for now. mdlog->flush(); diff --git a/trunk/ceph/mds/MDS.h b/trunk/ceph/mds/MDS.h index 7dcd921d05f4e..400bdb130281e 100644 --- a/trunk/ceph/mds/MDS.h +++ b/trunk/ceph/mds/MDS.h @@ -119,6 +119,8 @@ class MDS : public Dispatcher { map peer_mdsmap_epoch; + tid_t last_tid; // for mds-initiated requests (e.g. stray rename) + public: void wait_for_active(Context *c) { waiting_for_active.push_back(c); @@ -143,6 +145,8 @@ class MDS : public Dispatcher { void set_want_state(int s); + tid_t issue_tid() { return ++last_tid; } + // -- waiters -- list finished_queue; @@ -223,8 +227,6 @@ class MDS : public Dispatcher { void send_message_client(Message *m, int client); void send_message_client(Message *m, entity_inst_t clientinst); - void send_message_client_maybe_opening(Message *m, int); - void send_message_client_maybe_open(Message *m, entity_inst_t clientinst); // start up, shutdown diff --git a/trunk/ceph/mds/Migrator.cc b/trunk/ceph/mds/Migrator.cc index 1c443c7bf6f79..78cd7fcfcbcc0 100644 --- a/trunk/ceph/mds/Migrator.cc +++ b/trunk/ceph/mds/Migrator.cc @@ -20,6 +20,7 @@ #include "Migrator.h" #include "Locker.h" #include "Migrator.h" +#include "Server.h" #include "MDBalancer.h" #include "MDLog.h" @@ -31,6 +32,7 @@ #include "events/EExport.h" #include "events/EImportStart.h" #include "events/EImportFinish.h" +#include "events/ESessions.h" #include "msg/Messenger.h" @@ -47,6 +49,13 @@ #include "messages/MExportDirNotifyAck.h" #include "messages/MExportDirFinish.h" +#include "messages/MExportCaps.h" +#include "messages/MExportCapsAck.h" + + + + + #include "config.h" #define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds || l <= g_conf.debug_mds_migrator) *_dout << dbeginl << g_clock.now() << " mds" << mds->get_nodeid() << ".migrator " @@ -92,6 +101,14 @@ void Migrator::dispatch(Message *m) handle_export_notify((MExportDirNotify*)m); break; + // caps + case MSG_MDS_EXPORTCAPS: + handle_export_caps((MExportCaps*)m); + break; + case MSG_MDS_EXPORTCAPSACK: + handle_export_caps_ack((MExportCapsAck*)m); + break; + default: assert(0); } @@ -758,12 +775,32 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) } +class C_M_ExportGo : public Context { + Migrator *migrator; + CDir *dir; +public: + C_M_ExportGo(Migrator *m, CDir *d) : migrator(m), dir(d) {} + void finish(int r) { + migrator->export_go_synced(dir); + } +}; + void Migrator::export_go(CDir *dir) -{ +{ assert(export_peer.count(dir)); int dest = export_peer[dir]; dout(7) << "export_go " << *dir << " to " << dest << dendl; + // first sync log to flush out e.g. any cap imports + mds->mdlog->wait_for_sync(new C_M_ExportGo(this, dir)); +} + +void Migrator::export_go_synced(CDir *dir) +{ + assert(export_peer.count(dir)); + int dest = export_peer[dir]; + dout(7) << "export_go_synced " << *dir << " to " << dest << dendl; + cache->show_subtrees(); export_warning_ack_waiting.erase(dir); @@ -830,6 +867,20 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, ::_encode_simple(in->inode.ino, enc_state); in->encode_export(enc_state); + // caps + encode_export_inode_caps(in, enc_state, exported_client_map); +} + +void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl, + map& exported_client_map) +{ + // encode caps + map cap_map; + in->export_client_caps(cap_map); + ::_encode_simple(cap_map, bl); + + in->state_set(CInode::STATE_EXPORTINGCAPS); + // make note of clients named by exported capabilities for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); @@ -837,27 +888,33 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, exported_client_map[it->first] = mds->clientmap.get_inst(it->first); } -void Migrator::finish_export_inode(CInode *in, utime_t now, list& finished) +void Migrator::finish_export_inode_caps(CInode *in) { - dout(12) << "finish_export_inode " << *in << dendl; + in->state_clear(CInode::STATE_EXPORTINGCAPS); - in->finish_export(now); - - // tell (all) clients about migrating caps.. mark STALE + // tell (all) clients about migrating caps.. for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); it++) { dout(7) << "finish_export_inode telling client" << it->first - << " stale caps on " << *in << dendl; - MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_STALE, + << " exported caps on " << *in << dendl; + MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_EXPORT, in->inode, it->second.get_last_seq(), it->second.pending(), it->second.wanted()); - entity_inst_t inst = mds->clientmap.get_inst(it->first); - mds->send_message_client_maybe_open(m, inst); + mds->send_message_client(m, it->first); } in->clear_client_caps(); +} + +void Migrator::finish_export_inode(CInode *in, utime_t now, list& finished) +{ + dout(12) << "finish_export_inode " << *in << dendl; + + in->finish_export(now); + + finish_export_inode_caps(in); // relax locks? if (!in->is_replicated()) @@ -1340,9 +1397,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) // must discover it! filepath fpath(m->get_path()); vector trace; - int r = cache->path_traverse(0, m, - 0, fpath, trace, true, - MDS_TRAVERSE_DISCOVER); + int r = cache->path_traverse(0, m, fpath, trace, true, MDS_TRAVERSE_DISCOVER); if (r > 0) return; // wait if (r < 0) { dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << dendl; @@ -1568,11 +1623,13 @@ class C_MDS_ImportDirLoggedStart : public Context { CDir *dir; int from; public: + map imported_client_map; + C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) : migrator(m), dir(d), from(f) { } void finish(int r) { - migrator->import_logged_start(dir, from); + migrator->import_logged_start(dir, from, imported_client_map); } }; @@ -1587,6 +1644,8 @@ void Migrator::handle_export_dir(MExportDir *m) cache->show_subtrees(); + C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num()); + // start the journal entry EImportStart *le = new EImportStart(dir->dirfrag(), m->get_bounds()); le->metablob.add_dir_context(dir); @@ -1595,9 +1654,11 @@ void Migrator::handle_export_dir(MExportDir *m) cache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth); // add this crap to my cache - map imported_client_map; bufferlist::iterator blp = m->get_dirstate().begin(); - ::_decode_simple(imported_client_map, blp); + + // new client sessions, open these after we journal + ::_decode_simple(onlogged->imported_client_map, blp); + mds->server->prepare_force_open_sessions(onlogged->imported_client_map); int num_imported_inodes = 0; while (!blp.end()) { @@ -1606,12 +1667,15 @@ void Migrator::handle_export_dir(MExportDir *m) oldauth, dir, // import root le, - imported_client_map, mds->mdlog->get_current_segment(), + import_caps[dir], import_updated_scatterlocks[dir]); } dout(10) << " " << m->get_bounds().size() << " imported bounds" << dendl; + // include imported sessions in EImportStart + le->client_map.claim(m->get_dirstate()); + // include bounds in EImportStart set import_bounds; cache->get_subtree_bounds(dir, import_bounds); @@ -1626,8 +1690,7 @@ void Migrator::handle_export_dir(MExportDir *m) dout(7) << "handle_export_dir did " << *dir << dendl; // log it - mds->mdlog->submit_entry(le, - new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num())); + mds->mdlog->submit_entry(le, onlogged); // note state import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART; @@ -1736,9 +1799,23 @@ void Migrator::import_reverse(CDir *dir) } } + // reexport caps + for (map >::iterator p = import_caps[dir].begin(); + p != import_caps[dir].end(); + ++p) { + CInode *in = p->first; + /* + * bleh.. just export all caps for this inode. the auth mds + * will pick them up during recovery. + */ + map cap_map; // throw this away + in->export_client_caps(cap_map); + finish_export_inode_caps(in); + } + // log our failure mds->mdlog->submit_entry(new EImportFinish(dir, false)); // log failure - + // bystanders? if (import_bystanders[dir].empty()) { dout(7) << "no bystanders, finishing reverse now" << dendl; @@ -1787,6 +1864,7 @@ void Migrator::import_reverse_final(CDir *dir) import_bystanders.erase(dir); import_bound_ls.erase(dir); import_updated_scatterlocks.erase(dir); + import_caps.erase(dir); // send pending import_maps? mds->mdcache->maybe_send_pending_resolves(); @@ -1796,13 +1874,25 @@ void Migrator::import_reverse_final(CDir *dir) } -void Migrator::import_logged_start(CDir *dir, int from) + + +void Migrator::import_logged_start(CDir *dir, int from, + map& imported_client_map) { dout(7) << "import_logged " << *dir << dendl; // note state import_state[dir->dirfrag()] = IMPORT_ACKING; + // force open client sessions and finish cap import + mds->server->finish_force_open_sessions(imported_client_map); + + for (map >::iterator p = import_caps[dir].begin(); + p != import_caps[dir].end(); + ++p) { + finish_import_inode_caps(p->first, from, p->second); + } + // send notify's etc. dout(7) << "sending ack for " << *dir << " to old auth mds" << from << dendl; mds->send_message_mds(new MExportDirAck(dir->dirfrag()), from); @@ -1847,6 +1937,7 @@ void Migrator::import_finish(CDir *dir) import_peer.erase(dir->dirfrag()); import_bystanders.erase(dir); import_bound_ls.erase(dir); + import_caps.erase(dir); import_updated_scatterlocks.erase(dir); // process delayed expires @@ -1871,8 +1962,8 @@ void Migrator::import_finish(CDir *dir) void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth, - map& imported_client_map, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks) { dout(15) << "decode_import_inode on " << *dn << dendl; @@ -1890,9 +1981,11 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o } // state after link -- or not! -sage - set merged_client_caps; - in->decode_import(blp, merged_client_caps, ls); - + in->decode_import(blp, ls); // cap imports are noted for later action + + // caps + decode_import_inode_caps(in, blp, cap_imports); + // link before state -- or not! -sage if (dn->inode != in) { assert(!dn->inode); @@ -1919,32 +2012,53 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o // adjust replica list //assert(!in->is_replica(oldauth)); // not true on failed export - in->add_replica( oldauth, CInode::EXPORT_NONCE ); + in->add_replica(oldauth, CInode::EXPORT_NONCE); if (in->is_replica(mds->get_nodeid())) in->remove_replica(mds->get_nodeid()); - // caps - for (set::iterator it = merged_client_caps.begin(); - it != merged_client_caps.end(); +} + +void Migrator::decode_import_inode_caps(CInode *in, + bufferlist::iterator &blp, + map >& cap_imports) +{ + map cap_map; + ::_decode_simple(cap_map, blp); + if (!cap_map.empty()) { + cap_imports[in].swap(cap_map); + in->get(CInode::PIN_IMPORTINGCAPS); + } +} + +void Migrator::finish_import_inode_caps(CInode *in, int from, + map &cap_map) +{ + assert(!cap_map.empty()); + + set new_caps; + in->merge_client_caps(cap_map, new_caps); + in->put(CInode::PIN_IMPORTINGCAPS); + + for (set::iterator it = new_caps.begin(); + it != new_caps.end(); it++) { - dout(0) << "merged caps for client" << *it << " on " << *in << dendl; - MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_REAP, + dout(0) << "finish_import_inode_caps for client" << *it << " on " << *in << dendl; + MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_IMPORT, in->inode, - in->client_caps[*it].get_last_seq(), - in->client_caps[*it].pending(), - in->client_caps[*it].wanted()); - caps->set_mds( oldauth ); // reap from whom? - mds->send_message_client_maybe_open(caps, imported_client_map[*it]); + in->client_caps[*it].get_last_seq(), + in->client_caps[*it].pending(), + in->client_caps[*it].wanted()); + caps->set_mds(from); // from whom? + mds->send_message_client(caps, *it); } } - int Migrator::decode_import_dir(bufferlist::iterator& blp, int oldauth, CDir *import_root, EImportStart *le, - map& imported_client_map, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks) { // set up dir @@ -2039,7 +2153,7 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp, } else if (icode == 'I') { // inode - decode_import_inode(dn, blp, oldauth, imported_client_map, ls, updated_scatterlocks); + decode_import_inode(dn, blp, oldauth, ls, cap_imports, updated_scatterlocks); } // add dentry to journal entry @@ -2102,6 +2216,93 @@ void Migrator::handle_export_notify(MExportDirNotify *m) +/** cap exports **/ + + + +void Migrator::export_caps(CInode *in) +{ + int dest = in->authority().first; + dout(7) << "export_caps to mds" << dest << " " << *in << dendl; + + assert(in->is_any_caps()); + assert(!in->is_auth()); + assert(!in->is_ambiguous_auth()); + assert(!in->state_test(CInode::STATE_EXPORTINGCAPS)); + + MExportCaps *ex = new MExportCaps; + ex->ino = in->ino(); + + encode_export_inode_caps(in, ex->cap_bl, ex->client_map); + + mds->send_message_mds(ex, dest); +} + +void Migrator::handle_export_caps_ack(MExportCapsAck *ack) +{ + CInode *in = cache->get_inode(ack->ino); + assert(in); + dout(10) << "handle_export_caps_ack " << *ack << " from " << ack->get_source() + << " on " << *in + << dendl; + + finish_export_inode_caps(in); + delete ack; +} + + +class C_M_LoggedImportCaps : public Context { + Migrator *migrator; + CInode *in; + int from; +public: + map > cap_imports; + + C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {} + void finish(int r) { + migrator->logged_import_caps(in, from, cap_imports); + } +}; + +void Migrator::handle_export_caps(MExportCaps *ex) +{ + dout(10) << "handle_export_caps " << *ex << " from " << ex->get_source() << dendl; + CInode *in = cache->get_inode(ex->ino); + + assert(in->is_auth()); + /* + * note: i may be frozen, but i won't have been encoded for export (yet)! + * see export_go() vs export_go_synced(). + */ + + C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num()); + ESessions *le = new ESessions(mds->clientmap.inc_projected()); + + // decode new caps + bufferlist::iterator blp = ex->cap_bl.begin(); + decode_import_inode_caps(in, blp, finish->cap_imports); + assert(!finish->cap_imports.empty()); // thus, inode is pinned. + + // journal open client sessions + mds->server->prepare_force_open_sessions(ex->client_map); + le->client_map.swap(ex->client_map); + + mds->mdlog->submit_entry(le, finish); + + delete ex; +} + + +void Migrator::logged_import_caps(CInode *in, + int from, + map >& cap_imports) +{ + dout(10) << "logged_import_caps on " << *in << dendl; + assert(cap_imports.count(in)); + finish_import_inode_caps(in, from, cap_imports[in]); + + mds->send_message_mds(new MExportCapsAck(in->ino()), from); +} diff --git a/trunk/ceph/mds/Migrator.h b/trunk/ceph/mds/Migrator.h index 07a8731868a92..b32a6a1a4f33f 100644 --- a/trunk/ceph/mds/Migrator.h +++ b/trunk/ceph/mds/Migrator.h @@ -41,6 +41,9 @@ class MExportDirNotify; class MExportDirNotifyAck; class MExportDirFinish; +class MExportCaps; +class MExportCapsAck; + class EImportStart; @@ -114,14 +117,7 @@ protected: map > import_bystanders; map > import_bound_ls; map > import_updated_scatterlocks; - - /* - // -- hashing madness -- - multimap unhash_waiting; // nodes i am waiting for UnhashDirAck's from - multimap import_hashed_replicate_waiting; // nodes i am waiting to discover to complete my import of a hashed dir - // maps frozen_dir_ino's to waiting-for-discover ino's. - multimap import_hashed_frozen_waiting; // dirs i froze (for the above) - */ + map > > import_caps; public: @@ -185,10 +181,14 @@ public: void clear_export_queue() { export_queue.clear(); } - - void encode_export_inode(CInode *in, bufferlist& enc_state, + + void encode_export_inode(CInode *in, bufferlist& bl, map& exported_client_map); + void encode_export_inode_caps(CInode *in, bufferlist& bl, + map& exported_client_map); void finish_export_inode(CInode *in, utime_t now, list& finished); + void finish_export_inode_caps(CInode *in); + int encode_export_dir(bufferlist& exportbl, CDir *dir, map& exported_client_map, @@ -200,20 +200,26 @@ public: } void clear_export_proxy_pins(CDir *dir); + void export_caps(CInode *in); + protected: void handle_export_discover_ack(MExportDirDiscoverAck *m); void export_frozen(CDir *dir); void handle_export_prep_ack(MExportDirPrepAck *m); void export_go(CDir *dir); + void export_go_synced(CDir *dir); void export_reverse(CDir *dir); void handle_export_ack(MExportDirAck *m); void export_logged_finish(CDir *dir); void handle_export_notify_ack(MExportDirNotifyAck *m); void export_finish(CDir *dir); + void handle_export_caps_ack(MExportCapsAck *m); + + friend class C_MDC_ExportFreeze; friend class C_MDS_ExportFinishLogged; - + friend class C_M_ExportGo; // importer void handle_export_discover(MExportDirDiscover *m); @@ -223,15 +229,19 @@ public: public: void decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth, - map& imported_client_map, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks); + void decode_import_inode_caps(CInode *in, + bufferlist::iterator &blp, + map >& cap_imports); + void finish_import_inode_caps(CInode *in, int from, map &cap_map); int decode_import_dir(bufferlist::iterator& blp, int oldauth, CDir *import_root, EImportStart *le, - map& imported_client_map, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks); public: @@ -241,19 +251,26 @@ protected: void import_reverse_unfreeze(CDir *dir); void import_reverse_final(CDir *dir); void import_notify_abort(CDir *dir, set& bounds); - void import_logged_start(CDir *dir, int from); + void import_logged_start(CDir *dir, int from, + map &imported_client_map); void handle_export_finish(MExportDirFinish *m); public: void import_finish(CDir *dir); protected: + void handle_export_caps(MExportCaps *m); + void logged_import_caps(CInode *in, + int from, + map >& cap_imports); + + friend class C_MDS_ImportDirLoggedStart; friend class C_MDS_ImportDirLoggedFinish; + friend class C_M_LoggedImportCaps; // bystander void handle_export_notify(MExportDirNotify *m); - }; diff --git a/trunk/ceph/mds/Server.cc b/trunk/ceph/mds/Server.cc index efb566c7cee29..768f9f275ce33 100644 --- a/trunk/ceph/mds/Server.cc +++ b/trunk/ceph/mds/Server.cc @@ -93,7 +93,7 @@ void Server::dispatch(Message *m) } // active? - if (!mds->is_active()) { + if (!mds->is_active() && !mds->is_stopping()) { dout(3) << "not active yet, waiting" << dendl; mds->wait_for_active(new C_MDS_RetryMessage(mds, m)); return; @@ -173,6 +173,7 @@ void Server::handle_client_session(MClientSession *m) // journal it version_t cmapv = mds->clientmap.inc_projected(); + dout(10) << " clientmap v " << mds->clientmap.get_version() << " pv " << cmapv << dendl; mdlog->submit_entry(new ESession(m->get_source_inst(), open, cmapv), new C_MDS_session_finish(mds, m->get_source_inst(), open, cmapv)); delete m; @@ -191,12 +192,15 @@ void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cma if (open) { assert(mds->clientmap.is_opening(from)); mds->clientmap.open_session(client_inst); - } else { - assert(mds->clientmap.is_closing(from)); + } else if (mds->clientmap.is_closing(from)) { mds->clientmap.close_session(from); // purge completed requests from clientmap - mds->clientmap.trim_completed_requests(from, 0); + mds->clientmap.trim_completed_requests(client_inst.name, 0); + } else { + // close must have been canceled (by an import?) ... + assert(!open); + mds->clientmap.noop(); } assert(cmapv == mds->clientmap.get_version()); @@ -208,6 +212,39 @@ void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cma mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst); } +void Server::prepare_force_open_sessions(map& cm) +{ + version_t cmapv = mds->clientmap.inc_projected(); + dout(10) << "prepare_force_open_sessions " << cmapv + << " on " << cm.size() << " clients" + << dendl; + for (map::iterator p = cm.begin(); p != cm.end(); ++p) { + mds->clientmap.add_opening(p->first); + } +} + +void Server::finish_force_open_sessions(map& cm) +{ + version_t v = mds->clientmap.get_version(); + dout(10) << "finish_force_open_sessions on " << cm.size() << " clients, v " << v << " -> " << (v+1) << dendl; + for (map::iterator p = cm.begin(); p != cm.end(); ++p) { + if (mds->clientmap.is_closing(p->first)) { + dout(15) << "force_open_sessions canceling close on " << p->second << dendl; + mds->clientmap.remove_closing(p->first); + continue; + } + if (mds->clientmap.have_session(p->first)) { + dout(15) << "force_open_sessions have session " << p->second << dendl; + continue; + } + + dout(10) << "force_open_sessions opening " << p->second << dendl; + mds->clientmap.open_session(p->second); + mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), p->second); + } + mds->clientmap.set_version(v+1); +} + void Server::terminate_sessions() { @@ -284,7 +321,7 @@ void Server::handle_client_reconnect(MClientReconnect *m) // mark client caps stale. inode_t fake_inode; fake_inode.ino = p->first; - MClientFileCaps *stale = new MClientFileCaps(MClientFileCaps::OP_STALE, + MClientFileCaps *stale = new MClientFileCaps(MClientFileCaps::OP_EXPORT, fake_inode, 0, 0, // doesn't matter. @@ -419,15 +456,24 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei) */ // include trace - if (tracei) { + if (tracei) reply->set_trace_dist( tracei, mds->get_nodeid() ); - } + + reply->set_mdsmap_epoch(mds->mdsmap->get_epoch()); // send reply - messenger->send_message(reply, req->get_client_inst()); + if (req->get_client_inst().name.is_mds()) + delete reply; // mds doesn't need a reply + else + messenger->send_message(reply, req->get_client_inst()); // finish request mdcache->request_finish(mdr); + + if (tracei && + tracei->get_parent_dn() && + tracei->get_parent_dn()->is_remote()) + mdcache->eval_remote(tracei->get_parent_dn()); } @@ -440,12 +486,12 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei) void Server::handle_client_request(MClientRequest *req) { dout(4) << "handle_client_request " << *req << dendl; - int client = req->get_client(); if (logger) logger->inc("hcreq"); - if (!mds->is_active()) { - dout(5) << " not active, discarding client request." << dendl; + if (!mds->is_active() && + !(mds->is_stopping() && req->get_client_inst().name.is_mds())) { + dout(5) << " not active (or stopping+mds), discarding request." << dendl; delete req; return; } @@ -457,12 +503,18 @@ void Server::handle_client_request(MClientRequest *req) } // active session? - if (!mds->clientmap.have_session(client)) { - dout(5) << "no session for client" << client << ", dropping" << dendl; + if (req->get_client_inst().name.is_client() && + !mds->clientmap.have_session(req->get_client_inst().name.num())) { + dout(5) << "no session for " << req->get_client_inst().name << ", dropping" << dendl; delete req; return; } + // old mdsmap? + if (req->get_mdsmap_epoch() < mds->mdsmap->get_epoch()) { + // send it? hrm, this isn't ideal; they may get a lot of copies if + // they have a high request rate. + } // okay, i want CInode *ref = 0; @@ -479,7 +531,7 @@ void Server::handle_client_request(MClientRequest *req) // trim completed_request list if (req->get_oldest_client_tid() > 0) { dout(15) << " oldest_client_tid=" << req->get_oldest_client_tid() << dendl; - mds->clientmap.trim_completed_requests(client, + mds->clientmap.trim_completed_requests(req->get_client_inst().name, req->get_oldest_client_tid()); } @@ -1008,7 +1060,7 @@ CDir *Server::traverse_to_auth_dir(MDRequest *mdr, vector &trace, file // traverse to parent dir int r = mdcache->path_traverse(mdr, mdr->client_request, - 0, refpath, trace, true, + refpath, trace, true, MDS_TRAVERSE_FORWARD); if (r > 0) return 0; // delayed if (r < 0) { @@ -1019,7 +1071,7 @@ CDir *Server::traverse_to_auth_dir(MDRequest *mdr, vector &trace, file // open inode CInode *diri; if (trace.empty()) - diri = mdcache->get_root(); + diri = mdcache->get_inode(refpath.get_ino()); else diri = mdcache->get_dentry_inode(trace[trace.size()-1], mdr); if (!diri) @@ -1048,7 +1100,7 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, bool want_auth) filepath refpath = req->get_filepath(); vector trace; int r = mdcache->path_traverse(mdr, req, - 0, refpath, + refpath, trace, req->follow_trailing_symlink(), MDS_TRAVERSE_FORWARD); if (r > 0) return false; // delayed @@ -1258,7 +1310,7 @@ version_t Server::predirty_dn_diri(MDRequest *mdr, CDentry *dn, EMetaBlob *blob) version_t dirpv = 0; CInode *diri = dn->dir->inode; - if (diri->is_root()) return 0; + if (diri->is_base()) return 0; if (diri->is_auth()) { assert(mdr->wrlocks.count(&diri->dirlock)); @@ -1590,9 +1642,6 @@ void Server::handle_client_readdir(MDRequest *mdr) } assert(in); - - assert(in); - dout(12) << "including inode " << *in << dendl; // add this dentry + inodeinfo @@ -1679,8 +1728,8 @@ void Server::handle_client_mknod(MDRequest *mdr) // it's a file. newi->inode.rdev = req->head.args.mknod.rdev; newi->inode.mode = req->head.args.mknod.mode; - newi->inode.mode &= ~INODE_TYPE_MASK; - newi->inode.mode |= INODE_MODE_FILE; + newi->inode.mode &= ~S_IFMT; + newi->inode.mode |= S_IFREG; newi->inode.version = dn->pre_dirty() - 1; // prepare finisher @@ -1714,8 +1763,8 @@ void Server::handle_client_mkdir(MDRequest *mdr) // it's a directory. newi->inode.mode = req->head.args.mkdir.mode; - newi->inode.mode &= ~INODE_TYPE_MASK; - newi->inode.mode |= INODE_MODE_DIR; + newi->inode.mode &= ~S_IFMT; + newi->inode.mode |= S_IFDIR; newi->inode.layout = g_OSD_MDDirLayout; newi->inode.version = dn->pre_dirty() - 1; @@ -1770,9 +1819,9 @@ void Server::handle_client_symlink(MDRequest *mdr) assert(newi); // it's a symlink - newi->inode.mode &= ~INODE_TYPE_MASK; - newi->inode.mode |= INODE_MODE_SYMLINK; - newi->symlink = req->get_sarg(); + newi->inode.mode &= ~S_IFMT; + newi->inode.mode |= S_IFLNK; + newi->symlink = req->get_path2(); newi->inode.version = dn->pre_dirty() - 1; // prepare finisher @@ -1799,7 +1848,7 @@ void Server::handle_client_link(MDRequest *mdr) MClientRequest *req = mdr->client_request; dout(7) << "handle_client_link " << req->get_filepath() - << " to " << req->get_sarg() + << " to " << req->get_filepath2() << dendl; // traverse to dest dir, make sure it's ours. @@ -1811,11 +1860,11 @@ void Server::handle_client_link(MDRequest *mdr) dout(7) << "handle_client_link link " << dname << " in " << *dir << dendl; // traverse to link target - filepath targetpath = req->get_sarg(); + filepath targetpath = req->get_filepath2(); dout(7) << "handle_client_link discovering target " << targetpath << dendl; vector targettrace; int r = mdcache->path_traverse(mdr, req, - 0, targetpath, targettrace, false, + targetpath, targettrace, false, MDS_TRAVERSE_DISCOVER); if (r > 0) return; // wait if (targettrace.empty()) r = -EINVAL; @@ -2229,7 +2278,7 @@ void Server::handle_client_unlink(MDRequest *mdr) // traverse to path vector trace; int r = mdcache->path_traverse(mdr, req, - 0, req->get_filepath(), trace, false, + req->get_filepath(), trace, false, MDS_TRAVERSE_FORWARD); if (r > 0) return; if (trace.empty()) r = -EINVAL; // can't unlink root @@ -2659,7 +2708,7 @@ void Server::handle_client_rename(MDRequest *mdr) // traverse to dest dir (not dest) // we do this FIRST, because the rename should occur on the // destdn's auth. - const filepath &destpath = req->get_sarg(); + const filepath &destpath = req->get_filepath2(); const string &destname = destpath.last_dentry(); vector desttrace; CDir *destdir = traverse_to_auth_dir(mdr, desttrace, destpath); @@ -2671,7 +2720,7 @@ void Server::handle_client_rename(MDRequest *mdr) filepath srcpath = req->get_filepath(); vector srctrace; int r = mdcache->path_traverse(mdr, req, - 0, srcpath, srctrace, false, + srcpath, srctrace, false, MDS_TRAVERSE_DISCOVER); if (r > 0) return; if (srctrace.empty()) r = -EINVAL; // can't rename root @@ -2683,6 +2732,7 @@ void Server::handle_client_rename(MDRequest *mdr) dout(10) << " srcdn " << *srcdn << dendl; CInode *srci = mdcache->get_dentry_inode(srcdn, mdr); dout(10) << " srci " << *srci << dendl; + mdr->pin(srci); // -- some sanity checks -- // src == dest? @@ -2741,7 +2791,6 @@ void Server::handle_client_rename(MDRequest *mdr) dout(10) << " destdn " << *destdn << dendl; - // -- locks -- set rdlocks, wrlocks, xlocks; @@ -2901,7 +2950,13 @@ void Server::handle_client_rename(MDRequest *mdr) EUpdate *le = new EUpdate(mdlog, "rename"); le->metablob.add_client_req(mdr->reqid); - _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn); + _rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn); + + if (!srcdn->is_auth() && srcdn->is_primary()) { + // importing inode; also journal imported client map + + // ** DER FIXME ** + } // -- commit locally -- C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn); @@ -2981,7 +3036,7 @@ void Server::_rename_prepare_witness(MDRequest *mdr, int who, CDentry *srcdn, CD void Server::_rename_prepare(MDRequest *mdr, - EMetaBlob *metablob, + EMetaBlob *metablob, bufferlist *client_map_bl, CDentry *srcdn, CDentry *destdn, CDentry *straydn) { dout(10) << "_rename_prepare " << *mdr << " " << *srcdn << " " << *destdn << dendl; @@ -3047,8 +3102,30 @@ void Server::_rename_prepare(MDRequest *mdr, version_t siv; if (srcdn->is_auth()) siv = srcdn->inode->get_projected_version(); - else + else { siv = mdr->more()->inode_import_v; + + /* import node */ + bufferlist::iterator blp = mdr->more()->inode_import.begin(); + + // imported caps + ::_decode_simple(mdr->more()->imported_client_map, blp); + ::_encode_simple(mdr->more()->imported_client_map, *client_map_bl); + prepare_force_open_sessions(mdr->more()->imported_client_map); + + list updated_scatterlocks; // we clear_updated explicitly below + + mdcache->migrator->decode_import_inode(srcdn, blp, + srcdn->authority().first, + mdr->ls, + mdr->more()->cap_imports, updated_scatterlocks); + srcdn->inode->dirlock.clear_updated(); + + + // hack: force back to !auth and clean, temporarily + srcdn->inode->state_clear(CInode::STATE_AUTH); + srcdn->inode->mark_clean(); + } mdr->more()->pvmap[destdn] = destdn->pre_dirty(siv+1); } metablob->add_primary_dentry(destdn, true, srcdn->inode); @@ -3191,16 +3268,15 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen // srcdn inode import? if (!srcdn->is_auth() && destdn->is_auth()) { assert(mdr->more()->inode_import.length() > 0); - bufferlist::iterator blp = mdr->more()->inode_import.begin(); - map imported_client_map; - list updated_scatterlocks; // we clear_updated explicitly below - ::_decode_simple(imported_client_map, blp); - mdcache->migrator->decode_import_inode(destdn, blp, - srcdn->authority().first, - imported_client_map, - mdr->ls, - updated_scatterlocks); - destdn->inode->dirlock.clear_updated(); + + // finish cap imports + finish_force_open_sessions(mdr->more()->imported_client_map); + if (mdr->more()->cap_imports.count(destdn->inode)) + mds->mdcache->migrator->finish_import_inode_caps(destdn->inode, srcdn->authority().first, + mdr->more()->cap_imports[destdn->inode]); + + // hack: fix auth bit + destdn->inode->state_set(CInode::STATE_AUTH); } if (destdn->inode->is_auth()) destdn->inode->mark_dirty(mdr->more()->pvmap[destdn], mdr->ls); @@ -3256,13 +3332,13 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) << " " << mdr->slave_request->srcdnpath << " to " << mdr->slave_request->destdnpath << dendl; - + // discover destdn filepath destpath(mdr->slave_request->destdnpath); dout(10) << " dest " << destpath << dendl; vector trace; int r = mdcache->path_traverse(mdr, mdr->slave_request, - 0, destpath, trace, false, + destpath, trace, false, MDS_TRAVERSE_DISCOVERXLOCK); if (r > 0) return; assert(r == 0); // we shouldn't get an error here! @@ -3271,15 +3347,14 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) dout(10) << " destdn " << *destdn << dendl; mdr->pin(destdn); - // discover srcdn filepath srcpath(mdr->slave_request->srcdnpath); dout(10) << " src " << srcpath << dendl; r = mdcache->path_traverse(mdr, mdr->slave_request, - 0, srcpath, trace, false, + srcpath, trace, false, MDS_TRAVERSE_DISCOVERXLOCK); if (r > 0) return; - assert(r == 0); // we shouldn't get an error here! + assert(r == 0); CDentry *srcdn = trace[trace.size()-1]; dout(10) << " srcdn " << *srcdn << dendl; @@ -3365,7 +3440,8 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) } // commit case - _rename_prepare(mdr, &le->commit, srcdn, destdn, straydn); + bufferlist blah; + _rename_prepare(mdr, &le->commit, &blah, srcdn, destdn, straydn); mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn)); } else { @@ -3373,8 +3449,9 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) dout(10) << "not journaling, i'm not auth for anything, and srci isn't open" << dendl; // prepare anyway; this may twiddle dir_auth - EMetaBlob blah; - _rename_prepare(mdr, &blah, srcdn, destdn, straydn); + EMetaBlob blob; + bufferlist blah; + _rename_prepare(mdr, &blob, &blah, srcdn, destdn, straydn); _logged_slave_rename(mdr, srcdn, destdn, straydn); } } @@ -3672,11 +3749,16 @@ void Server::handle_client_open(MDRequest *mdr) if (!cur) return; // regular file? - if ((cur->inode.mode & INODE_TYPE_MASK) != INODE_MODE_FILE) { - dout(7) << "not a regular file " << *cur << dendl; + if (!cur->inode.is_file() && !cur->inode.is_dir()) { + dout(7) << "not a file or dir " << *cur << dendl; reply_request(mdr, -EINVAL); // FIXME what error do we want? return; } + // can only open a dir rdonly, no flags. + if (cur->inode.is_dir() && (cmode != FILE_MODE_R || flags != 0)) { + reply_request(mdr, -EINVAL); + return; + } // hmm, check permissions or something. @@ -3939,7 +4021,7 @@ void Server::handle_client_openc(MDRequest *mdr) // it's a file. in->inode.mode = req->head.args.open.mode; - in->inode.mode |= INODE_MODE_FILE; + in->inode.mode |= S_IFREG; in->inode.version = dn->pre_dirty() - 1; // prepare finisher diff --git a/trunk/ceph/mds/Server.h b/trunk/ceph/mds/Server.h index 281fd13ca2593..d2252f33df7bc 100644 --- a/trunk/ceph/mds/Server.h +++ b/trunk/ceph/mds/Server.h @@ -22,6 +22,7 @@ class LogEvent; class C_MDS_rename_finish; class MDRequest; class EMetaBlob; +class EUpdate; class PVList; class MMDSSlaveRequest; @@ -56,6 +57,8 @@ public: void handle_client_session(class MClientSession *m); void _session_logged(entity_inst_t ci, bool open, version_t cmapv); + void prepare_force_open_sessions(map &cm); + void finish_force_open_sessions(map &cm); void terminate_sessions(); void reconnect_clients(); void handle_client_reconnect(class MClientReconnect *m); @@ -166,7 +169,7 @@ public: void _rename_prepare_witness(MDRequest *mdr, int who, CDentry *srcdn, CDentry *destdn, CDentry *straydn); void _rename_prepare(MDRequest *mdr, - EMetaBlob *metablob, + EMetaBlob *metablob, bufferlist *client_map_bl, CDentry *srcdn, CDentry *destdn, CDentry *straydn); void _rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn); diff --git a/trunk/ceph/mds/events/EImportStart.h b/trunk/ceph/mds/events/EImportStart.h index aa1902576542d..5671e404298a4 100644 --- a/trunk/ceph/mds/events/EImportStart.h +++ b/trunk/ceph/mds/events/EImportStart.h @@ -30,6 +30,8 @@ protected: public: EMetaBlob metablob; + bufferlist client_map; // encoded map + version_t cmapv; EImportStart(dirfrag_t di, list& b) : LogEvent(EVENT_IMPORTSTART), @@ -44,12 +46,16 @@ protected: bl.append((char*)&base, sizeof(base)); metablob._encode(bl); ::_encode(bounds, bl); + ::_encode(cmapv, bl); + ::_encode(client_map, bl); } void decode_payload(bufferlist& bl, int& off) { bl.copy(off, sizeof(base), (char*)&base); off += sizeof(base); metablob._decode(bl, off); ::_decode(bounds, bl, off); + ::_decode(cmapv, bl, off); + ::_decode(client_map, bl, off); } bool has_expired(MDS *mds); diff --git a/trunk/ceph/mds/events/ESession.h b/trunk/ceph/mds/events/ESession.h index a8f9992486a18..3aba5559aac1c 100644 --- a/trunk/ceph/mds/events/ESession.h +++ b/trunk/ceph/mds/events/ESession.h @@ -37,14 +37,14 @@ class ESession : public LogEvent { } void encode_payload(bufferlist& bl) { - ::_encode(client_inst, bl); - ::_encode(open, bl); - ::_encode(cmapv, bl); + ::_encode(client_inst, bl); + ::_encode(open, bl); + ::_encode(cmapv, bl); } void decode_payload(bufferlist& bl, int& off) { - ::_decode(client_inst, bl, off); - ::_decode(open, bl, off); - ::_decode(cmapv, bl, off); + ::_decode(client_inst, bl, off); + ::_decode(open, bl, off); + ::_decode(cmapv, bl, off); } diff --git a/trunk/ceph/mds/events/ESessions.h b/trunk/ceph/mds/events/ESessions.h new file mode 100644 index 0000000000000..0db175c948877 --- /dev/null +++ b/trunk/ceph/mds/events/ESessions.h @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MDS_ESESSIONS_H +#define __MDS_ESESSIONS_H + +#include +#include "config.h" +#include "include/types.h" + +#include "../LogEvent.h" + +class ESessions : public LogEvent { +protected: + version_t cmapv; // client map version + +public: + map client_map; + + ESessions() : LogEvent(EVENT_SESSION) { } + ESessions(version_t v) : + LogEvent(EVENT_SESSION), + cmapv(v) { + } + + void encode_payload(bufferlist& bl) { + ::_encode(client_map, bl); + ::_encode(cmapv, bl); + } + void decode_payload(bufferlist& bl, int& off) { + ::_decode(client_map, bl, off); + ::_decode(cmapv, bl, off); + } + + + void print(ostream& out) { + out << "ESessions " << client_map.size() << " opens cmapv " << cmapv; + } + + void update_segment(); + void replay(MDS *mds); +}; + +#endif diff --git a/trunk/ceph/mds/events/EUpdate.h b/trunk/ceph/mds/events/EUpdate.h index de965429f9bdd..3939527cef41c 100644 --- a/trunk/ceph/mds/events/EUpdate.h +++ b/trunk/ceph/mds/events/EUpdate.h @@ -22,6 +22,7 @@ class EUpdate : public LogEvent { public: EMetaBlob metablob; string type; + bufferlist client_map; EUpdate() : LogEvent(EVENT_UPDATE) { } EUpdate(MDLog *mdlog, const char *s) : @@ -37,10 +38,12 @@ public: void encode_payload(bufferlist& bl) { ::_encode(type, bl); metablob._encode(bl); + ::_encode(client_map, bl); } void decode_payload(bufferlist& bl, int& off) { ::_decode(type, bl, off); metablob._decode(bl, off); + ::_decode(client_map, bl, off); } void update_segment(); diff --git a/trunk/ceph/mds/journal.cc b/trunk/ceph/mds/journal.cc index 1f27cf713a078..3b39679dcd61f 100644 --- a/trunk/ceph/mds/journal.cc +++ b/trunk/ceph/mds/journal.cc @@ -15,6 +15,7 @@ #include "events/EString.h" #include "events/ESubtreeMap.h" #include "events/ESession.h" +#include "events/ESessions.h" #include "events/EMetaBlob.h" @@ -73,12 +74,18 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) dout(6) << "LogSegment(" << offset << ").try_to_expire" << dendl; // commit dirs - for (xlist::iterator p = dirty_dirfrags.begin(); !p.end(); ++p) + for (xlist::iterator p = dirty_dirfrags.begin(); !p.end(); ++p) { + assert((*p)->is_auth()); commit.insert(*p); - for (xlist::iterator p = dirty_dentries.begin(); !p.end(); ++p) + } + for (xlist::iterator p = dirty_dentries.begin(); !p.end(); ++p) { + assert((*p)->is_auth()); commit.insert((*p)->get_dir()); - for (xlist::iterator p = dirty_inodes.begin(); !p.end(); ++p) + } + for (xlist::iterator p = dirty_inodes.begin(); !p.end(); ++p) { + assert((*p)->is_auth()); commit.insert((*p)->get_parent_dn()->get_dir()); + } if (!commit.empty()) { if (!gather) gather = new C_Gather; @@ -87,6 +94,7 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) p != commit.end(); ++p) { CDir *dir = *p; + assert(dir->is_auth()); if (dir->can_auth_pin()) { dout(15) << "try_to_expire committing " << *dir << dendl; dir->commit(0, gather->new_sub()); @@ -755,7 +763,7 @@ void ESession::replay(MDS *mds) // hrm, this isn't very pretty. if (!open) - mds->clientmap.trim_completed_requests(client_inst.name.num(), 0); + mds->clientmap.trim_completed_requests(client_inst.name, 0); } else { dout(10) << "ESession.replay clientmap " << mds->clientmap.get_version() @@ -765,12 +773,31 @@ void ESession::replay(MDS *mds) mds->clientmap.open_session(client_inst); } else { mds->clientmap.close_session(client_inst.name.num()); - mds->clientmap.trim_completed_requests(client_inst.name.num(), 0); + mds->clientmap.trim_completed_requests(client_inst.name, 0); } mds->clientmap.reset_projected(); // make it follow version. } } +void ESessions::update_segment() +{ + _segment->clientmapv = cmapv; +} + +void ESessions::replay(MDS *mds) +{ + if (mds->clientmap.get_version() >= cmapv) { + dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() + << " >= " << cmapv << ", noop" << dendl; + } else { + dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() + << " < " << cmapv << dendl; + mds->clientmap.open_sessions(client_map); + assert(mds->clientmap.get_version() == cmapv); + mds->clientmap.reset_projected(); // make it follow version. + } +} + // ----------------------- @@ -1048,6 +1075,21 @@ void EImportStart::replay(MDS *mds) // put in ambiguous import list mds->mdcache->add_ambiguous_import(base, bounds); + + // open client sessions? + if (mds->clientmap.get_version() >= cmapv) { + dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() + << " >= " << cmapv << ", noop" << dendl; + } else { + dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() + << " < " << cmapv << dendl; + map cm; + bufferlist::iterator blp = client_map.begin(); + ::_decode_simple(cm, blp); + mds->clientmap.open_sessions(cm); + assert(mds->clientmap.get_version() == cmapv); + mds->clientmap.reset_projected(); // make it follow version. + } } // ----------------------- diff --git a/trunk/ceph/mds/mdstypes.h b/trunk/ceph/mds/mdstypes.h index ee14474761ada..c185909d89fae 100644 --- a/trunk/ceph/mds/mdstypes.h +++ b/trunk/ceph/mds/mdstypes.h @@ -49,30 +49,30 @@ using namespace std; struct metareqid_t { + entity_name_t name; uint64_t tid; - int32_t client; - int32_t _pad; - metareqid_t() : tid(0), client(-1), _pad(0) {} - metareqid_t(int c, tid_t t) : tid(t), client(c), _pad(0) {} + metareqid_t() : tid(0) {} + //metareqid_t(int c, tid_t t) : tid(t) { name = entity_name_t::CLIENT(c); } + metareqid_t(entity_name_t n, tid_t t) : name(n), tid(t) {} }; inline ostream& operator<<(ostream& out, const metareqid_t& r) { - return out << "client" << r.client << ":" << r.tid; + return out << r.name << ":" << r.tid; } inline bool operator==(const metareqid_t& l, const metareqid_t& r) { - return (l.client == r.client) && (l.tid == r.tid); + return (l.name == r.name) && (l.tid == r.tid); } inline bool operator!=(const metareqid_t& l, const metareqid_t& r) { - return (l.client != r.client) || (l.tid != r.tid); + return (l.name != r.name) || (l.tid != r.tid); } inline bool operator<(const metareqid_t& l, const metareqid_t& r) { - return (l.client < r.client) || - (l.client == r.client && l.tid < r.tid); + return (l.name < r.name) || + (l.name == r.name && l.tid < r.tid); } inline bool operator<=(const metareqid_t& l, const metareqid_t& r) { - return (l.client < r.client) || - (l.client == r.client && l.tid <= r.tid); + return (l.name < r.name) || + (l.name == r.name && l.tid <= r.tid); } inline bool operator>(const metareqid_t& l, const metareqid_t& r) { return !(l <= r); } inline bool operator>=(const metareqid_t& l, const metareqid_t& r) { return !(l < r); } @@ -81,7 +81,7 @@ namespace __gnu_cxx { template<> struct hash { size_t operator()(const metareqid_t &r) const { hash H; - return H(r.client) ^ H(r.tid); + return H(r.name.num()) ^ H(r.name.type()) ^ H(r.tid); } }; } @@ -538,6 +538,7 @@ protected: virtual bool can_auth_pin() = 0; virtual void auth_pin() = 0; virtual void auth_unpin() = 0; + virtual bool is_frozen() = 0; // -------------------------------------------- diff --git a/trunk/ceph/messages/MClientFileCaps.h b/trunk/ceph/messages/MClientFileCaps.h index a926760e72088..c8e02f61bf7a2 100644 --- a/trunk/ceph/messages/MClientFileCaps.h +++ b/trunk/ceph/messages/MClientFileCaps.h @@ -18,20 +18,26 @@ #include "msg/Message.h" #include "mds/Capability.h" + class MClientFileCaps : public Message { public: static const int OP_GRANT = 0; // mds->client grant. static const int OP_ACK = 1; // client->mds ack (if prior grant was a recall) - static const int OP_RELEASE = 2; // mds closed the cap - static const int OP_STALE = 3; // mds has exported the cap - static const int OP_REAP = 4; // mds has imported the cap from get_mds() + static const int OP_RELEASE = 2; // mds->client release cap (*) + static const int OP_EXPORT = 3; // mds has exported the cap + static const int OP_IMPORT = 4; // mds has imported the cap from get_mds() + /* + * (*) it's a bit counterintuitive, but the mds has to + * close the cap because the client isn't able to tell + * if a concurrent open() would map to the same inode. + */ static const char* get_opname(int op) { switch (op) { case OP_GRANT: return "grant"; case OP_ACK: return "ack"; case OP_RELEASE: return "release"; - case OP_STALE: return "stale"; - case OP_REAP: return "reap"; + case OP_EXPORT: return "export"; + case OP_IMPORT: return "import"; default: assert(0); return 0; } } diff --git a/trunk/ceph/messages/MClientReply.h b/trunk/ceph/messages/MClientReply.h index 3717d21e7d8ce..ca1fdf4c13143 100644 --- a/trunk/ceph/messages/MClientReply.h +++ b/trunk/ceph/messages/MClientReply.h @@ -143,6 +143,9 @@ class MClientReply : public Message { long get_tid() { return st.tid; } int get_op() { return st.op; } + void set_mdsmap_epoch(epoch_t e) { st.mdsmap_epoch = e; } + epoch_t get_mdsmap_epoch() { return st.mdsmap_epoch; } + int get_result() { return st.result; } const string& get_path() { return path; } diff --git a/trunk/ceph/messages/MClientRequest.h b/trunk/ceph/messages/MClientRequest.h index 5336f8cb51ece..e63695f401899 100644 --- a/trunk/ceph/messages/MClientRequest.h +++ b/trunk/ceph/messages/MClientRequest.h @@ -77,8 +77,7 @@ public: struct ceph_client_request_head head; // path arguments - filepath path; - string sarg; + filepath path, path2; public: // cons @@ -90,9 +89,12 @@ public: this->head.client_inst.addr = ci.addr.v; } + void set_mdsmap_epoch(epoch_t e) { head.mdsmap_epoch = e; } + epoch_t get_mdsmap_epoch() { return head.mdsmap_epoch; } + metareqid_t get_reqid() { // FIXME: for now, assume clients always have 1 incarnation - return metareqid_t(head.client_inst.name.num, head.tid); + return metareqid_t(head.client_inst.name, head.tid); } int get_open_file_mode() { @@ -158,11 +160,12 @@ public: void set_retry_attempt(int a) { head.retry_attempt = a; } void set_path(string& p) { path.set_path(p); } void set_path(const char *p) { path.set_path(p); } - void set_path(const filepath& fp) { path = fp; } + void set_filepath(const filepath& fp) { path = fp; } + void set_path2(string& p) { path2.set_path(p); } + void set_path2(const char *p) { path2.set_path(p); } + void set_filepath2(const filepath& fp) { path2 = fp; } void set_caller_uid(int u) { head.caller_uid = u; } void set_caller_gid(int g) { head.caller_gid = g; } - void set_sarg(string& arg) { this->sarg = arg; } - void set_sarg(const char *arg) { this->sarg = arg; } void set_mds_wants_replica_in_dirino(inodeno_t dirino) { head.mds_wants_replica_in_dirino = dirino; } @@ -173,8 +176,7 @@ public: entity_inst_t get_client_inst() { return entity_inst_t(head.client_inst); } - - int get_client() { return head.client_inst.name.num; } + entity_name_t get_client() { return head.client_inst.name; } tid_t get_tid() { return head.tid; } tid_t get_oldest_client_tid() { return head.oldest_client_tid; } int get_num_fwd() { return head.num_fwd; } @@ -182,10 +184,12 @@ public: int get_op() { return head.op; } int get_caller_uid() { return head.caller_uid; } int get_caller_gid() { return head.caller_gid; } - //inodeno_t get_ino() { return head.ino; } + const string& get_path() { return path.get_path(); } filepath& get_filepath() { return path; } - string& get_sarg() { return sarg; } + const string& get_path2() { return path.get_path(); } + filepath& get_filepath2() { return path2; } + inodeno_t get_mds_wants_replica_in_dirino() { return head.mds_wants_replica_in_dirino; } @@ -196,18 +200,18 @@ public: payload.copy(off, sizeof(head), (char*)&head); off += sizeof(head); path._decode(payload, off); - ::_decode(sarg, payload, off); + path2._decode(payload, off); } void encode_payload() { payload.append((char*)&head, sizeof(head)); path._encode(payload); - ::_encode(sarg, payload); + path2._encode(payload); } char *get_type_name() { return "creq"; } void print(ostream& out) { - out << "clientreq(client" << get_client() + out << "clientreq(" << get_client() << "." << get_tid() << " "; switch(get_op()) { @@ -257,10 +261,10 @@ public: out << "unknown=" << get_op(); assert(0); } - if (get_path().length()) - out << " " << get_path(); - if (get_sarg().length()) - out << " " << get_sarg(); + if (!get_filepath().empty()) + out << " " << get_filepath(); + if (!get_filepath2().empty()) + out << " " << get_filepath2(); if (head.retry_attempt) out << " RETRY=" << head.retry_attempt; out << ")"; diff --git a/trunk/ceph/messages/MDirUpdate.h b/trunk/ceph/messages/MDirUpdate.h index 0db32208efd45..87d7e4fa7389b 100644 --- a/trunk/ceph/messages/MDirUpdate.h +++ b/trunk/ceph/messages/MDirUpdate.h @@ -25,14 +25,14 @@ class MDirUpdate : public Message { int discover; } st; set dir_rep_by; - string path; + filepath path; public: dirfrag_t get_dirfrag() { return st.dirfrag; } int get_dir_rep() { return st.dir_rep; } set& get_dir_rep_by() { return dir_rep_by; } bool should_discover() { return st.discover > 0; } - string& get_path() { return path; } + filepath& get_path() { return path; } void tried_discover() { if (st.discover) st.discover--; @@ -42,7 +42,7 @@ class MDirUpdate : public Message { MDirUpdate(dirfrag_t dirfrag, int dir_rep, set& dir_rep_by, - string& path, + filepath& path, bool discover = false) : Message(MSG_MDS_DIRUPDATE) { this->st.dirfrag = dirfrag; @@ -52,19 +52,22 @@ class MDirUpdate : public Message { this->path = path; } virtual char *get_type_name() { return "dir_update"; } + void print(ostream& out) { + out << "dir_update(" << get_dirfrag() << ")"; + } virtual void decode_payload() { int off = 0; payload.copy(off, sizeof(st), (char*)&st); off += sizeof(st); ::_decode(dir_rep_by, payload, off); - ::_decode(path, payload, off); + path._decode(payload, off); } virtual void encode_payload() { payload.append((char*)&st, sizeof(st)); ::_encode(dir_rep_by, payload); - ::_encode(path, payload); + path._encode(payload); } }; diff --git a/trunk/ceph/messages/MExportCaps.h b/trunk/ceph/messages/MExportCaps.h new file mode 100644 index 0000000000000..f2057bfb1ebc1 --- /dev/null +++ b/trunk/ceph/messages/MExportCaps.h @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef __MEXPORTCAPS_H +#define __MEXPORTCAPS_H + +#include "msg/Message.h" + + +class MExportCaps : public Message { + public: + inodeno_t ino; + bufferlist cap_bl; + map client_map; + + MExportCaps() : + Message(MSG_MDS_EXPORTCAPS) {} + + virtual char *get_type_name() { return "export_caps"; } + void print(ostream& o) { + o << "export_caps(" << ino << ")"; + } + + virtual void decode_payload() { + int off = 0; + ::_decode(ino, payload, off); + ::_decode(cap_bl, payload, off); + ::_decode(client_map, payload, off); + } + virtual void encode_payload() { + ::_encode(ino, payload); + ::_encode(cap_bl, payload); + ::_encode(client_map, payload); + } + +}; + +#endif diff --git a/trunk/ceph/messages/MExportCapsAck.h b/trunk/ceph/messages/MExportCapsAck.h new file mode 100644 index 0000000000000..dd5e212ecfd99 --- /dev/null +++ b/trunk/ceph/messages/MExportCapsAck.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef __MEXPORTCAPSACK_H +#define __MEXPORTCAPSACK_H + +#include "msg/Message.h" + + +class MExportCapsAck : public Message { + public: + inodeno_t ino; + + MExportCapsAck() : + Message(MSG_MDS_EXPORTCAPSACK) {} + MExportCapsAck(inodeno_t i) : + Message(MSG_MDS_EXPORTCAPSACK), ino(i) {} + + virtual char *get_type_name() { return "export_caps_ack"; } + void print(ostream& o) { + o << "export_caps_ack(" << ino << ")"; + } + + virtual void decode_payload() { + int off = 0; + ::_decode(ino, payload, off); + } + virtual void encode_payload() { + ::_encode(ino, payload); + } + +}; + +#endif diff --git a/trunk/ceph/messages/MExportDirDiscover.h b/trunk/ceph/messages/MExportDirDiscover.h index c311d1e87e940..01c61a67648c3 100644 --- a/trunk/ceph/messages/MExportDirDiscover.h +++ b/trunk/ceph/messages/MExportDirDiscover.h @@ -21,12 +21,12 @@ class MExportDirDiscover : public Message { dirfrag_t dirfrag; - string path; + filepath path; public: inodeno_t get_ino() { return dirfrag.ino; } dirfrag_t get_dirfrag() { return dirfrag; } - string& get_path() { return path; } + filepath& get_path() { return path; } bool started; @@ -47,12 +47,12 @@ class MExportDirDiscover : public Message { virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::_decode_simple(dirfrag, p); - ::_decode_simple(path, p); + path._decode(p); } virtual void encode_payload() { ::_encode_simple(dirfrag, payload); - ::_encode_simple(path, payload); + path._encode(payload); } }; diff --git a/trunk/ceph/messages/MMDSGetMap.h b/trunk/ceph/messages/MMDSGetMap.h index 80f753bd365eb..adf47fd649396 100644 --- a/trunk/ceph/messages/MMDSGetMap.h +++ b/trunk/ceph/messages/MMDSGetMap.h @@ -18,20 +18,22 @@ #include "msg/Message.h" #include "include/types.h" +#include "include/encodable.h" class MMDSGetMap : public Message { public: - MMDSGetMap() : Message(CEPH_MSG_MDS_GETMAP) { } + epoch_t have; + + MMDSGetMap(epoch_t h=0) : Message(CEPH_MSG_MDS_GETMAP), have (h) { } char *get_type_name() { return "mdsgetmap"; } void encode_payload() { - //payload.append((char*)&sb, sizeof(sb)); + ::_encode_simple(have, payload); } void decode_payload() { - //int off = 0; - //payload.copy(off, sizeof(sb), (char*)&sb); - //off += sizeof(sb); + bufferlist::iterator p = payload.begin(); + ::_decode_simple(have, p); } }; diff --git a/trunk/ceph/messages/MMDSSlaveRequest.h b/trunk/ceph/messages/MMDSSlaveRequest.h index 5ef65223ec1c9..a5c2339fd4cd5 100644 --- a/trunk/ceph/messages/MMDSSlaveRequest.h +++ b/trunk/ceph/messages/MMDSSlaveRequest.h @@ -77,8 +77,8 @@ class MMDSSlaveRequest : public Message { public: // for rename prep - string srcdnpath; - string destdnpath; + filepath srcdnpath; + filepath destdnpath; set witnesses; bufferlist inode_export; version_t inode_export_v; @@ -110,8 +110,8 @@ public: ::_encode(lock_type, payload); object_info._encode(payload); ::_encode_complex(authpins, payload); - ::_encode(srcdnpath, payload); - ::_encode(destdnpath, payload); + srcdnpath._encode(payload); + destdnpath._encode(payload); ::_encode(witnesses, payload); ::_encode(now, payload); ::_encode(inode_export, payload); @@ -126,8 +126,8 @@ public: ::_decode_simple(lock_type, p); object_info._decode(p); ::_decode_complex(authpins, p); - ::_decode_simple(srcdnpath, p); - ::_decode_simple(destdnpath, p); + srcdnpath._decode(p); + destdnpath._decode(p); ::_decode_simple(witnesses, p); ::_decode_simple(now, p); ::_decode_simple(inode_export, p); diff --git a/trunk/ceph/mon/MDSMonitor.cc b/trunk/ceph/mon/MDSMonitor.cc index be8e760986847..7fd788afb2b26 100644 --- a/trunk/ceph/mon/MDSMonitor.cc +++ b/trunk/ceph/mon/MDSMonitor.cc @@ -41,7 +41,7 @@ // my methods -void MDSMonitor::print_map(MDSMap &m) +void MDSMonitor::print_map(MDSMap &m, int dbl) { dout(7) << "print_map epoch " << m.get_epoch() << " max " << m.max_mds << dendl; entity_inst_t blank; @@ -100,8 +100,8 @@ bool MDSMonitor::update_from_paxos() mdsmap.decode(mdsmap_bl); // new map - dout(7) << "new map:" << dendl; - print_map(mdsmap); + dout(0) << "new map" << dendl; + print_map(mdsmap, 0); // bcast map to mds, waiters if (mon->is_leader()) @@ -153,7 +153,7 @@ bool MDSMonitor::preprocess_query(Message *m) return preprocess_beacon((MMDSBeacon*)m); case CEPH_MSG_MDS_GETMAP: - send_full(m->get_source_inst()); + handle_mds_getmap((MMDSGetMap*)m); return true; case MSG_MON_COMMAND: @@ -166,6 +166,14 @@ bool MDSMonitor::preprocess_query(Message *m) } } +void MDSMonitor::handle_mds_getmap(MMDSGetMap *m) +{ + if (m->have < mdsmap.get_epoch()) + send_full(m->get_source_inst()); + else + waiting_for_map.push_back(m->get_source_inst()); +} + bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) { diff --git a/trunk/ceph/mon/MDSMonitor.h b/trunk/ceph/mon/MDSMonitor.h index c4dc095236501..76420621d07a2 100644 --- a/trunk/ceph/mon/MDSMonitor.h +++ b/trunk/ceph/mon/MDSMonitor.h @@ -27,6 +27,7 @@ using namespace std; #include "PaxosService.h" class MMDSBeacon; +class MMDSGetMap; class MDSMonitor : public PaxosService { public: @@ -37,7 +38,7 @@ class MDSMonitor : public PaxosService { MDSMap pending_mdsmap; // current + pending updates // my helpers - void print_map(MDSMap &m); + void print_map(MDSMap &m, int dbl=7); class C_Updated : public Context { MDSMonitor *mm; @@ -72,6 +73,7 @@ class MDSMonitor : public PaxosService { bool preprocess_beacon(class MMDSBeacon *m); bool handle_beacon(class MMDSBeacon *m); bool handle_command(class MMonCommand *m); + void handle_mds_getmap(MMDSGetMap *m); void take_over(entity_addr_t addr, int mds); diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index 7d984c23d2eab..5acbfcc0be774 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -298,7 +298,8 @@ FakeMessenger::FakeMessenger(entity_name_t me) : Messenger(me) lock.Unlock(); - dout(0) << "fakemessenger " << get_myname() << " messenger is " << this << " at " << _myinst << dendl; + dout(0) << "fakemessenger " << get_myname() << " messenger is " << this + << " at " << get_myaddr() << dendl; qlen = 0; @@ -348,10 +349,8 @@ int FakeMessenger::shutdown() void FakeMessenger::reset_myname(entity_name_t m) { dout(1) << "reset_myname from " << get_myname() << " to " << m << dendl; - _set_myname(m); - _myinst.name = m; - + // put myself in the fail queue? if (g_fake_kill_after.count(m)) { utime_t w = start_time; diff --git a/trunk/ceph/msg/FakeMessenger.h b/trunk/ceph/msg/FakeMessenger.h index 0b08b8c9d4c55..698eee5184b69 100644 --- a/trunk/ceph/msg/FakeMessenger.h +++ b/trunk/ceph/msg/FakeMessenger.h @@ -32,8 +32,6 @@ class FakeMessenger : public Messenger { int qlen; list incoming; // incoming queue - entity_inst_t _myinst; - public: bool failed; @@ -42,13 +40,6 @@ class FakeMessenger : public Messenger { virtual int shutdown(); - const entity_inst_t& get_myinst() { - return _myinst; - }; - const entity_addr_t& get_myaddr() { - return _myinst.addr; - } - void reset_myname(entity_name_t m); // msg interface diff --git a/trunk/ceph/msg/Message.cc b/trunk/ceph/msg/Message.cc index 6e5724a2dc697..fbf73e629b696 100644 --- a/trunk/ceph/msg/Message.cc +++ b/trunk/ceph/msg/Message.cc @@ -80,6 +80,10 @@ using namespace std; #include "messages/MExportDirNotifyAck.h" #include "messages/MExportDirFinish.h" +#include "messages/MExportCaps.h" +#include "messages/MExportCapsAck.h" + + #include "messages/MDentryUnlink.h" #include "messages/MHeartbeat.h" @@ -307,6 +311,13 @@ decode_message(ceph_msg_header& env, bufferlist& front, bufferlist& data) m = new MExportDirWarningAck; break; + + case MSG_MDS_EXPORTCAPS: + m = new MExportCaps; + break; + case MSG_MDS_EXPORTCAPSACK: + m = new MExportCapsAck; + break; case MSG_MDS_DENTRYUNLINK: diff --git a/trunk/ceph/msg/Message.h b/trunk/ceph/msg/Message.h index 01f492f0df626..6bf468524b2ad 100644 --- a/trunk/ceph/msg/Message.h +++ b/trunk/ceph/msg/Message.h @@ -73,6 +73,9 @@ #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459 #define MSG_MDS_EXPORTDIRFINISH 0x460 +#define MSG_MDS_EXPORTCAPS 0x470 +#define MSG_MDS_EXPORTCAPSACK 0x471 + #define MSG_MDS_BEACON 90 // to monitor #define MSG_MDS_SLAVE_REQUEST 91 diff --git a/trunk/ceph/msg/Messenger.h b/trunk/ceph/msg/Messenger.h index 1bb9c8acb28ed..94ee67e1287cd 100644 --- a/trunk/ceph/msg/Messenger.h +++ b/trunk/ceph/msg/Messenger.h @@ -34,21 +34,23 @@ class Timer; class Messenger { private: Dispatcher *dispatcher; - entity_name_t _myname; + +protected: + entity_inst_t _myinst; public: - Messenger(entity_name_t w) : dispatcher(0), _myname(w) { } + Messenger(entity_name_t w) : dispatcher(0) { + _myinst.name = w; + } virtual ~Messenger() { } // accessors - entity_name_t get_myname() { return _myname; } - void _set_myname(entity_name_t m) { _myname = m; } - + entity_name_t get_myname() { return _myinst.name; } + const entity_addr_t& get_myaddr() { return _myinst.addr; } + const entity_inst_t& get_myinst() { return _myinst; } + + void _set_myname(entity_name_t m) { _myinst.name = m; } virtual void reset_myname(entity_name_t m) = 0; - - virtual const entity_addr_t &get_myaddr() = 0; - - entity_inst_t get_myinst() { return entity_inst_t(_myname, get_myaddr()); } // hrmpf. virtual int get_dispatch_queue_len() { return 0; }; diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index 59c1235658ea5..8eb201b224a9a 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -612,8 +612,6 @@ void Rank::EntityMessenger::reset_myname(entity_name_t newname) } - - void Rank::EntityMessenger::mark_down(entity_addr_t a) { rank.mark_down(a); @@ -621,34 +619,8 @@ void Rank::EntityMessenger::mark_down(entity_addr_t a) void Rank::mark_down(entity_addr_t addr) { - //if (my_rank == 0) return; // ugh.. rank0 already handles this stuff in the namer lock.Lock(); - /* - if (entity_map.count(a) && - entity_map[a] > inst) { - dout(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << dendl; - derr(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << dendl; - // do nothing! - } else { - if (entity_map.count(a) == 0) { - // don't know it - dout(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << dendl; - derr(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << dendl; - } else { - // know it - assert(entity_map[a] <= inst); - dout(10) << "mark_down " << a << " inst " << inst << dendl; - derr(10) << "mark_down " << a << " inst " << inst << dendl; - - entity_map.erase(a); - - if (rank_pipe.count(inst)) { - rank_pipe[inst]->close(); - rank_pipe.erase(inst); - } - } - } - */ + // FIXME lock.Unlock(); } diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h index af2de93c2b134..7a0f0e4213aaa 100644 --- a/trunk/ceph/msg/SimpleMessenger.h +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -228,8 +228,8 @@ private: } public: - EntityMessenger(entity_name_t myaddr, int r) : - Messenger(myaddr), + EntityMessenger(entity_name_t name, int r) : + Messenger(name), stop(false), qlen(0), pqlen(0), my_rank(r), @@ -247,8 +247,6 @@ private: dispatch_thread.join(); } - const entity_addr_t &get_myaddr() { return my_addr; } - int get_dispatch_queue_len() { return qlen + pqlen; } void reset_myname(entity_name_t m); diff --git a/trunk/ceph/msg/msg_types.h b/trunk/ceph/msg/msg_types.h index 79de4fe43914f..b79fc70a3cf2e 100644 --- a/trunk/ceph/msg/msg_types.h +++ b/trunk/ceph/msg/msg_types.h @@ -34,7 +34,8 @@ public: // cons entity_name_t() { v.type = v.num = 0; } - entity_name_t(int t, int n=NEW) { v.type = t; v.num = n; } + entity_name_t(int t, int n) { v.type = t; v.num = n; } + entity_name_t(const ceph_entity_name &n) : v(n) { } // static cons static entity_name_t MON(int i=NEW) { return entity_name_t(TYPE_MON, i); } diff --git a/trunk/ceph/osdc/Journaler.h b/trunk/ceph/osdc/Journaler.h index a90ec5f9e348f..7f7a5753ad708 100644 --- a/trunk/ceph/osdc/Journaler.h +++ b/trunk/ceph/osdc/Journaler.h @@ -205,6 +205,7 @@ public: bool is_active() { return state == STATE_ACTIVE; } off_t get_write_pos() const { return write_pos; } + off_t get_write_ack_pos() const { return ack_pos; } off_t get_read_pos() const { return read_pos; } off_t get_expire_pos() const { return expire_pos; } off_t get_trimmed_pos() const { return trimmed_pos; } -- 2.39.5