From 4ba592e1d9599ff79c65840ef59d6d59c54e66c0 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 2 Jan 2008 14:02:30 -0800 Subject: [PATCH] caps in per-session list; cleaned up cap bit defs --- src/client/Client.cc | 30 ++++++++--------- src/client/Client.h | 10 +++--- src/client/FileCache.cc | 24 +++++++------- src/client/FileCache.h | 4 +-- src/include/ceph_fs.h | 10 ++++++ src/mds/CInode.h | 29 ++++++++-------- src/mds/Capability.h | 54 +++++++++++++++--------------- src/mds/FileLock.h | 28 ++++++++-------- src/mds/Locker.cc | 60 ++++++++++++++++------------------ src/mds/Locker.h | 2 +- src/mds/MDCache.cc | 13 ++++---- src/mds/Server.cc | 10 +++--- src/mds/SessionMap.h | 12 ++++--- src/messages/MClientFileCaps.h | 1 - src/messages/MPing.h | 17 ++++++---- src/messages/MPingAck.h | 22 +++++++++---- 16 files changed, 175 insertions(+), 151 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 25ca5a1527728..6c1dd5029e703 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -395,7 +395,7 @@ Inode* Client::insert_inode(Dir *dir, InodeStat *st, const string& dname) dn->inode->mask = st->mask; // or do we have newer size/mtime from writing? - if (dn->inode->file_caps() & CAP_FILE_WR) { + if (dn->inode->file_caps() & CEPH_CAP_WR) { if (dn->inode->file_wr_size > dn->inode->inode.size) dn->inode->inode.size = dn->inode->file_wr_size; if (dn->inode->file_wr_mtime > dn->inode->inode.mtime) @@ -1240,8 +1240,8 @@ void Client::handle_file_caps(MClientFileCaps *m) << " was " << cap_string(old_caps) << dendl; // did file size decrease? - if ((old_caps & (CAP_FILE_RD|CAP_FILE_WR)) == 0 && - (new_caps & (CAP_FILE_RD|CAP_FILE_WR)) != 0 && + if ((old_caps & (CEPH_CAP_RD|CEPH_CAP_WR)) == 0 && + (new_caps & (CEPH_CAP_RD|CEPH_CAP_WR)) != 0 && in->inode.size > m->get_inode().size) { dout(10) << "*** file size decreased from " << in->inode.size << " to " << m->get_inode().size << dendl; @@ -1278,7 +1278,7 @@ void Client::handle_file_caps(MClientFileCaps *m) // caching off. // wake up waiters? - if (new_caps & CAP_FILE_RD) { + if (new_caps & CEPH_CAP_RD) { for (list::iterator it = in->waitfor_read.begin(); it != in->waitfor_read.end(); it++) { @@ -1287,7 +1287,7 @@ void Client::handle_file_caps(MClientFileCaps *m) } in->waitfor_read.clear(); } - if (new_caps & CAP_FILE_WR) { + if (new_caps & CEPH_CAP_WR) { for (list::iterator it = in->waitfor_write.begin(); it != in->waitfor_write.end(); it++) { @@ -1296,7 +1296,7 @@ void Client::handle_file_caps(MClientFileCaps *m) } in->waitfor_write.clear(); } - if (new_caps & CAP_FILE_LAZYIO) { + if (new_caps & CEPH_CAP_LAZYIO) { for (list::iterator it = in->waitfor_lazy.begin(); it != in->waitfor_lazy.end(); it++) { @@ -2662,8 +2662,8 @@ void Client::close_release(Inode *in) in->fc.release_clean(); int retain = 0; - if (in->num_open_wr || in->fc.is_dirty()) retain |= CAP_FILE_WR | CAP_FILE_WRBUFFER | CAP_FILE_WREXTEND; - if (in->num_open_rd || in->fc.is_cached()) retain |= CAP_FILE_RD | CAP_FILE_RDCACHE; + if (in->num_open_wr || in->fc.is_dirty()) retain |= CEPH_CAP_WR | CEPH_CAP_WRBUFFER | CEPH_CAP_WREXTEND; + if (in->num_open_rd || in->fc.is_cached()) retain |= CEPH_CAP_RD | CEPH_CAP_RDCACHE; release_caps(in, retain); // release caps now. } @@ -2853,7 +2853,7 @@ int Client::_read(Fh *f, off_t offset, off_t size, bufferlist *bl) // determine whether read range overlaps with file // ...ONLY if we're doing async io - if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) { + if (!lazy && (in->file_caps() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) { // we're doing buffered i/o. make sure we're inside the file. // we can trust size info bc we get accurate info when buffering/caching caps are issued. dout(10) << "file size: " << in->inode.size << dendl; @@ -2898,14 +2898,14 @@ int Client::_read(Fh *f, off_t offset, off_t size, bufferlist *bl) // object cache OFF -- legacy inconsistent way. // do we have read file cap? - while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) { + while (!lazy && (in->file_caps() & CEPH_CAP_RD) == 0) { dout(7) << " don't have read cap, waiting" << dendl; Cond cond; in->waitfor_read.push_back(&cond); cond.Wait(client_lock); } // lazy cap? - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3021,13 +3021,13 @@ int Client::_write(Fh *f, off_t offset, off_t size, const char *buf) dout(7) << "synchronous write" << dendl; // do we have write file cap? - while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) { + while (!lazy && (in->file_caps() & CEPH_CAP_WR) == 0) { dout(7) << " don't have write cap, waiting" << dendl; Cond cond; in->waitfor_write.push_back(&cond); cond.Wait(client_lock); } - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3283,7 +3283,7 @@ int Client::lazyio_propogate(int fd, off_t offset, size_t count) if (f->mode & FILE_MODE_LAZY) { // wait for lazy cap - while ((in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3319,7 +3319,7 @@ int Client::lazyio_synchronize(int fd, off_t offset, size_t count) if (f->mode & FILE_MODE_LAZY) { // wait for lazy cap - while ((in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); diff --git a/src/client/Client.h b/src/client/Client.h index ed0b38ed33d01..7881c56e5ac5d 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -228,11 +228,11 @@ class Inode { int file_caps_wanted() { int w = 0; - if (num_open_rd) w |= CAP_FILE_RD|CAP_FILE_RDCACHE; - if (num_open_wr) w |= CAP_FILE_WR|CAP_FILE_WRBUFFER; - if (num_open_lazy) w |= CAP_FILE_LAZYIO; - if (fc.is_dirty()) w |= CAP_FILE_WRBUFFER; - if (fc.is_cached()) w |= CAP_FILE_RDCACHE; + if (num_open_rd) w |= CEPH_CAP_RD|CEPH_CAP_RDCACHE; + if (num_open_wr) w |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER; + if (num_open_lazy) w |= CEPH_CAP_LAZYIO; + if (fc.is_dirty()) w |= CEPH_CAP_WRBUFFER; + if (fc.is_cached()) w |= CEPH_CAP_RDCACHE; return w; } diff --git a/src/client/FileCache.cc b/src/client/FileCache.cc index 1adec4aaabee7..344148da2d614 100644 --- a/src/client/FileCache.cc +++ b/src/client/FileCache.cc @@ -123,10 +123,10 @@ void FileCache::set_caps(int caps, Context *onimplement) int FileCache::get_used_caps() { int used = 0; - if (num_reading) used |= CAP_FILE_RD; - if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE; - if (num_writing) used |= CAP_FILE_WR; - if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER; + if (num_reading) used |= CEPH_CAP_RD; + if (oc->set_is_cached(inode.ino)) used |= CEPH_CAP_RDCACHE; + if (num_writing) used |= CEPH_CAP_WR; + if (oc->set_is_dirty_or_committing(inode.ino)) used |= CEPH_CAP_WRBUFFER; return used; } @@ -138,11 +138,11 @@ void FileCache::check_caps() // try to implement caps? // BUG? latest_caps, not least caps i've seen? - if ((latest_caps & CAP_FILE_RDCACHE) == 0 && - (used & CAP_FILE_RDCACHE)) + if ((latest_caps & CEPH_CAP_RDCACHE) == 0 && + (used & CEPH_CAP_RDCACHE)) release_clean(); - if ((latest_caps & CAP_FILE_WRBUFFER) == 0 && - (used & CAP_FILE_WRBUFFER)) + if ((latest_caps & CEPH_CAP_WRBUFFER) == 0 && + (used & CEPH_CAP_WRBUFFER)) flush_dirty(new C_FC_CheckCaps(this)); used = get_used_caps(); @@ -176,7 +176,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ int r = 0; // can i read? - while ((latest_caps & CAP_FILE_RD) == 0) { + while ((latest_caps & CEPH_CAP_RD) == 0) { dout(10) << "read doesn't have RD cap, blocking" << dendl; Cond c; waitfor_read.insert(&c); @@ -187,7 +187,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ // inc reading counter num_reading++; - if (latest_caps & CAP_FILE_RDCACHE) { + if (latest_caps & CEPH_CAP_RDCACHE) { // read (and block) Cond cond; bool done = false; @@ -221,7 +221,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock) { // can i write - while ((latest_caps & CAP_FILE_WR) == 0) { + while ((latest_caps & CEPH_CAP_WR) == 0) { dout(10) << "write doesn't have WR cap, blocking" << dendl; Cond c; waitfor_write.insert(&c); @@ -233,7 +233,7 @@ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& clien num_writing++; if (size > 0) { - if (latest_caps & CAP_FILE_WRBUFFER) { // caps buffered write? + if (latest_caps & CEPH_CAP_WRBUFFER) { // caps buffered write? // wait? (this may block!) oc->wait_for_write(size, client_lock); diff --git a/src/client/FileCache.h b/src/client/FileCache.h index 8d6e08146b508..03322fb929a0a 100644 --- a/src/client/FileCache.h +++ b/src/client/FileCache.h @@ -53,8 +53,8 @@ class FileCache { } // waiters/waiting - bool can_read() { return latest_caps & CAP_FILE_RD; } - bool can_write() { return latest_caps & CAP_FILE_WR; } + bool can_read() { return latest_caps & CEPH_CAP_RD; } + bool can_write() { return latest_caps & CEPH_CAP_WR; } bool all_safe();// { return num_unsafe == 0; } void add_safe_waiter(Context *c); diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index dca8e6ebe1b39..5df1ea8fbaf22 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -65,6 +65,16 @@ static inline __u32 frag_value(__u32 f) { return f & 0xffffffu; } static inline __u32 frag_mask(__u32 f) { return 0xffffffffull >> (32-frag_bits(f)); } static inline __u32 frag_next(__u32 f) { return (frag_bits(f) << 24) | (frag_value(f)+1); } +/* + * file caps + */ +#define CEPH_CAP_RDCACHE 1 // client can safely cache reads +#define CEPH_CAP_RD 2 // client can read +#define CEPH_CAP_WR 4 // client can write +#define CEPH_CAP_WREXTEND 8 // client can extend file +#define CEPH_CAP_WRBUFFER 16 // client can safely buffer writes +#define CEPH_CAP_LAZYIO 32 // client can perform lazy io + /* * object layout - how objects are mapped into PGs diff --git a/src/mds/CInode.h b/src/mds/CInode.h index b6da550a8d7b4..c9ce121b05ab6 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -343,11 +343,19 @@ public: // client caps bool is_any_caps() { return !client_caps.empty(); } map& get_client_caps() { return client_caps; } - void add_client_cap(int client, Capability& cap) { + Capability *get_client_cap(int client) { + if (client_caps.count(client)) + return &client_caps[client]; + return 0; + } + Capability *add_client_cap(int client, CInode *in, xlist& cls) { if (client_caps.empty()) get(PIN_CAPS); assert(client_caps.count(client) == 0); - client_caps[client] = cap; + Capability *cap = &client_caps[client]; + cap->set_inode(in); + cap->add_to_cap_list(cls); + return cap; } void remove_client_cap(int client) { assert(client_caps.count(client) == 1); @@ -355,19 +363,14 @@ public: if (client_caps.empty()) put(PIN_CAPS); } - Capability* get_client_cap(int client) { - if (client_caps.count(client)) - return &client_caps[client]; - return 0; - } - void reconnect_cap(int client, inode_caps_reconnect_t& icr) { + void reconnect_cap(int client, inode_caps_reconnect_t& icr, xlist& cls) { Capability *cap = get_client_cap(client); if (cap) { cap->merge(icr.wanted, icr.issued); } else { - Capability newcap(icr.wanted, 0); - newcap.issue(icr.issued); - add_client_cap(client, newcap); + cap = add_client_cap(client, this, cls); + cap->set_wanted(icr.wanted); + cap->issue(icr.issued); } inode.size = MAX(inode.size, icr.size); inode.mtime = MAX(inode.mtime, icr.mtime); @@ -406,7 +409,7 @@ public: client_caps[it->first].merge(it->second); } else { // new - client_caps[it->first] = Capability(it->second); + client_caps[it->first] = Capability(this, it->second); } } } @@ -448,7 +451,7 @@ public: linklock.replicate_relax(); dirfragtreelock.replicate_relax(); - if (get_caps_issued() & (CAP_FILE_WR|CAP_FILE_WRBUFFER) == 0) + if ((get_caps_issued() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0) filelock.replicate_relax(); dirlock.replicate_relax(); diff --git a/src/mds/Capability.h b/src/mds/Capability.h index 5c014fd1f85b1..8df19e6a70b22 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -25,34 +25,27 @@ using namespace std; #include "config.h" -// define caps -#define CAP_FILE_RDCACHE 1 // client can safely cache reads -#define CAP_FILE_RD 2 // client can read -#define CAP_FILE_WR 4 // client can write -#define CAP_FILE_WREXTEND 8 // client can extend file -#define CAP_FILE_WRBUFFER 16 // client can safely buffer writes -#define CAP_FILE_LAZYIO 32 // client can perform lazy io - - // heuristics -//#define CAP_FILE_DELAYFLUSH 32 +//#define CEPH_CAP_DELAYFLUSH 32 inline string cap_string(int cap) { string s; s = "["; - if (cap & CAP_FILE_RDCACHE) s += " rdcache"; - if (cap & CAP_FILE_RD) s += " rd"; - if (cap & CAP_FILE_WR) s += " wr"; - if (cap & CAP_FILE_WRBUFFER) s += " wrbuffer"; - if (cap & CAP_FILE_WRBUFFER) s += " wrextend"; - if (cap & CAP_FILE_LAZYIO) s += " lazyio"; + if (cap & CEPH_CAP_RDCACHE) s += " rdcache"; + if (cap & CEPH_CAP_RD) s += " rd"; + if (cap & CEPH_CAP_WR) s += " wr"; + if (cap & CEPH_CAP_WRBUFFER) s += " wrbuffer"; + if (cap & CEPH_CAP_WRBUFFER) s += " wrextend"; + if (cap & CEPH_CAP_LAZYIO) s += " lazyio"; s += " ]"; return s; } typedef uint32_t capseq_t; +class CInode; + class Capability { public: struct Export { @@ -64,6 +57,7 @@ public: }; private: + CInode *inode; int wanted_caps; // what the client wants (ideally) map cap_history; // seq -> cap @@ -71,19 +65,21 @@ private: bool suppress; public: - xlist::item xlist_item; + xlist::item session_caps_item; - Capability(int want=0, capseq_t s=0) : + Capability(CInode *i=0, int want=0, capseq_t s=0) : + inode(i), wanted_caps(want), last_sent(s), last_recv(s), suppress(false), - xlist_item(this) { + session_caps_item(this) { } - Capability(Export& other) : + Capability(CInode *i, Export& other) : + inode(i), wanted_caps(other.wanted), last_sent(0), last_recv(0), - xlist_item(this) { + session_caps_item(this) { // issued vs pending if (other.issued & ~other.pending) issue(other.issued); @@ -93,6 +89,12 @@ public: bool is_suppress() { return suppress; } void set_suppress(bool b) { suppress = b; } + CInode *get_inode() { return inode; } + void set_inode(CInode *i) { inode = i; } + void add_to_cap_list(xlist& ls) { + ls.push_back(&session_caps_item); + } + bool is_null() { return cap_history.empty() && wanted_caps == 0; } // most recently issued caps. @@ -130,17 +132,17 @@ public: // needed static int needed(int from) { // strip out wrbuffer, rdcache - return from & (CAP_FILE_WR|CAP_FILE_RD); + return from & (CEPH_CAP_WR|CEPH_CAP_RD); } int needed() { return needed(wanted_caps); } // conflicts static int conflicts(int from) { int c = 0; - if (from & CAP_FILE_WRBUFFER) c |= CAP_FILE_RDCACHE|CAP_FILE_RD; - if (from & CAP_FILE_WR) c |= CAP_FILE_RDCACHE; - if (from & CAP_FILE_RD) c |= CAP_FILE_WRBUFFER; - if (from & CAP_FILE_RDCACHE) c |= CAP_FILE_WRBUFFER|CAP_FILE_WR; + if (from & CEPH_CAP_WRBUFFER) c |= CEPH_CAP_RDCACHE|CEPH_CAP_RD; + if (from & CEPH_CAP_WR) c |= CEPH_CAP_RDCACHE; + if (from & CEPH_CAP_RD) c |= CEPH_CAP_WRBUFFER; + if (from & CEPH_CAP_RDCACHE) c |= CEPH_CAP_WRBUFFER|CEPH_CAP_WR; return c; } int wanted_conflicts() { return conflicts(wanted()); } diff --git a/src/mds/FileLock.h b/src/mds/FileLock.h index 09868f7563fb6..b2e409198bf69 100644 --- a/src/mds/FileLock.h +++ b/src/mds/FileLock.h @@ -159,52 +159,52 @@ class FileLock : public SimpleLock { // client caps allowed int caps_allowed_ever() { if (parent->is_auth()) - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_WRBUFFER | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_WRBUFFER | CEPH_CAP_LAZYIO; else - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO; } int caps_allowed() { if (parent->is_auth()) switch (state) { case LOCK_SYNC: - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_LOCK: case LOCK_GLOCKR: case LOCK_GLOCKL: - return CAP_FILE_RDCACHE; + return CEPH_CAP_RDCACHE; case LOCK_GLOCKM: return 0; case LOCK_MIXED: - return CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_LAZYIO; case LOCK_GMIXEDR: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_GMIXEDL: return 0; case LOCK_LONER: // single client writer, of course. - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_WRBUFFER | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_WRBUFFER | CEPH_CAP_LAZYIO; case LOCK_GLONERR: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_GLONERM: - return CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_LAZYIO; case LOCK_GSYNCL: - return CAP_FILE_RDCACHE | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_LAZYIO; case LOCK_GSYNCM: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; } else switch (state) { case LOCK_SYNC: - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_LOCK: case LOCK_GLOCKR: - return CAP_FILE_RDCACHE; + return CEPH_CAP_RDCACHE; case LOCK_GMIXEDR: case LOCK_MIXED: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; } assert(0); return 0; diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index ce027a694686c..860b1000ba12e 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -447,32 +447,30 @@ version_t Locker::issue_file_data_version(CInode *in) Capability* Locker::issue_new_caps(CInode *in, - int mode, - MClientRequest *req) + int mode, + Session *session) { dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl; // my needs - int my_client = req->get_client().num(); + assert(session->inst.name.is_client()); + int my_client = session->inst.name.num(); int my_want = 0; - if (mode & FILE_MODE_R) my_want |= CAP_FILE_RDCACHE | CAP_FILE_RD; - if (mode & FILE_MODE_W) my_want |= CAP_FILE_WRBUFFER | CAP_FILE_WR; + if (mode & FILE_MODE_R) my_want |= CEPH_CAP_RDCACHE | CEPH_CAP_RD; + if (mode & FILE_MODE_W) my_want |= CEPH_CAP_WRBUFFER | CEPH_CAP_WR; // register a capability Capability *cap = in->get_client_cap(my_client); if (!cap) { // new cap - Capability c(my_want); - in->add_client_cap(my_client, c); - cap = in->get_client_cap(my_client); - - // suppress file cap messages for new cap (we'll bundle with the open() reply) - cap->set_suppress(true); + cap = in->add_client_cap(my_client, in, session->caps); + cap->set_wanted(my_want); + cap->set_suppress(true); // suppress file cap messages for new cap (we'll bundle with the open() reply) } else { // make sure it has sufficient caps if (my_want & ~cap->wanted()) { // augment wanted caps for this client - cap->set_wanted( cap->wanted() | my_want ); + cap->set_wanted(cap->wanted() | my_want); } } @@ -498,14 +496,14 @@ Capability* Locker::issue_new_caps(CInode *in, int now = cap->pending(); if (before != now && - (before & CAP_FILE_WR) == 0 && - (now & CAP_FILE_WR)) { + (before & CEPH_CAP_WR) == 0 && + (now & CEPH_CAP_WR)) { // FIXME FIXME FIXME } // twiddle file_data_version? - if ((before & CAP_FILE_WRBUFFER) == 0 && - (now & CAP_FILE_WRBUFFER)) { + if ((before & CEPH_CAP_WRBUFFER) == 0 && + (now & CEPH_CAP_WRBUFFER)) { in->inode.file_data_version++; dout(7) << " incrementing file_data_version, now " << in->inode.file_data_version << " for " << *in << dendl; } @@ -538,8 +536,8 @@ bool Locker::issue_caps(CInode *in) int after = it->second.pending(); // twiddle file_data_version? - if (!(before & CAP_FILE_WRBUFFER) && - (after & CAP_FILE_WRBUFFER)) { + if (!(before & CEPH_CAP_WRBUFFER) && + (after & CEPH_CAP_WRBUFFER)) { dout(7) << " incrementing file_data_version for " << *in << dendl; in->inode.file_data_version++; } @@ -740,7 +738,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m) in->inode.atime = m->get_inode().atime; } - if ((has|had) & CAP_FILE_WR) { + if ((has|had) & CEPH_CAP_WR) { bool dirty = false; // mtime @@ -2423,7 +2421,7 @@ void Locker::file_eval(FileLock *lock) // * -> loner? if (!lock->is_rdlocked() && !lock->is_waiter_for(SimpleLock::WAIT_WR) && - (wanted & CAP_FILE_WR) && + (wanted & CEPH_CAP_WR) && loner && lock->get_state() != LOCK_LONER) { dout(7) << "file_eval stable, bump to loner " << *lock << " on " << *lock->get_parent() << dendl; @@ -2433,8 +2431,8 @@ void Locker::file_eval(FileLock *lock) // * -> mixed? else if (!lock->is_rdlocked() && !lock->is_waiter_for(SimpleLock::WAIT_WR) && - (wanted & CAP_FILE_RD) && - (wanted & CAP_FILE_WR) && + (wanted & CEPH_CAP_RD) && + (wanted & CEPH_CAP_WR) && !(loner && lock->get_state() == LOCK_LONER) && lock->get_state() != LOCK_MIXED) { dout(7) << "file_eval stable, bump to mixed " << *lock << " on " << *lock->get_parent() << dendl; @@ -2443,8 +2441,8 @@ void Locker::file_eval(FileLock *lock) // * -> sync? else if (!in->filelock.is_waiter_for(SimpleLock::WAIT_WR) && - !(wanted & (CAP_FILE_WR|CAP_FILE_WRBUFFER)) && - ((wanted & CAP_FILE_RD) || + !(wanted & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) && + ((wanted & CEPH_CAP_RD) || in->is_replicated() || (!loner && lock->get_state() == LOCK_LONER)) && lock->get_state() != LOCK_SYNC) { @@ -2473,7 +2471,7 @@ bool Locker::file_sync(FileLock *lock) int issued = in->get_caps_issued(); - assert((in->get_caps_wanted() & CAP_FILE_WR) == 0); + assert((in->get_caps_wanted() & CEPH_CAP_WR) == 0); if (lock->get_state() == LOCK_LOCK) { if (in->is_replicated()) { @@ -2491,7 +2489,7 @@ bool Locker::file_sync(FileLock *lock) else if (lock->get_state() == LOCK_MIXED) { // writers? - if (issued & CAP_FILE_WR) { + if (issued & CEPH_CAP_WR) { // gather client write caps lock->set_state(LOCK_GSYNCM); lock->get_parent()->auth_pin(); @@ -2512,7 +2510,7 @@ bool Locker::file_sync(FileLock *lock) else if (lock->get_state() == LOCK_LONER) { // writers? - if (issued & CAP_FILE_WR) { + if (issued & CEPH_CAP_WR) { // gather client write caps lock->set_state(LOCK_GSYNCL); lock->get_parent()->auth_pin(); @@ -2601,7 +2599,7 @@ void Locker::file_lock(FileLock *lock) } else if (lock->get_state() == LOCK_LONER) { - if (issued & CAP_FILE_WR) { + if (issued & CEPH_CAP_WR) { // change lock lock->set_state(LOCK_GLOCKL); lock->get_parent()->auth_pin(); @@ -2664,7 +2662,7 @@ void Locker::file_mixed(FileLock *lock) } else if (lock->get_state() == LOCK_LONER) { - if (issued & CAP_FILE_WRBUFFER) { + if (issued & CEPH_CAP_WRBUFFER) { // gather up WRBUFFER caps lock->set_state(LOCK_GMIXEDL); lock->get_parent()->auth_pin(); @@ -2786,7 +2784,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m) lock->set_state(LOCK_GLOCKR); // call back caps? - if (issued & CAP_FILE_RD) { + if (issued & CEPH_CAP_RD) { dout(7) << "handle_file_lock client readers, gathering caps on " << *in << dendl; issue_caps(in); break; @@ -2811,7 +2809,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m) if (lock->get_state() == LOCK_SYNC) { // MIXED - if (issued & CAP_FILE_RD) { + if (issued & CEPH_CAP_RD) { // call back client caps lock->set_state(LOCK_GMIXEDR); issue_caps(in); diff --git a/src/mds/Locker.h b/src/mds/Locker.h index a69055f49449e..2993f04c7cfd9 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -178,7 +178,7 @@ protected: // -- file i/o -- public: version_t issue_file_data_version(CInode *in); - Capability* issue_new_caps(CInode *in, int mode, MClientRequest *req); + Capability* issue_new_caps(CInode *in, int mode, Session *session); bool issue_caps(CInode *in); protected: diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index bf5947df107f5..4f54c7c65cca9 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -2651,21 +2651,20 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t& { dout(10) << "rejoin_import_cap for client" << client << " from mds" << frommds << " on " << *in << dendl; - + + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client)); + assert(session); + // add cap - in->reconnect_cap(client, icr); + in->reconnect_cap(client, icr, session->caps); // send REAP - // FIXME client session weirdness. MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_IMPORT, in->inode, in->client_caps[client].get_last_seq(), in->client_caps[client].pending(), in->client_caps[client].wanted()); - - reap->set_mds( frommds ); // reap from whom? - Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client)); - assert(session); + reap->set_mds(frommds); // reap from whom? mds->messenger->send_message(reap, session->inst); } diff --git a/src/mds/Server.cc b/src/mds/Server.cc index be4e02ab959d3..a4edca86400cb 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -310,7 +310,7 @@ void Server::handle_client_reconnect(MClientReconnect *m) if (in && in->is_auth()) { // we recovered it, and it's ours. take note. dout(15) << "open caps on " << *in << dendl; - in->reconnect_cap(from, p->second); + in->reconnect_cap(from, p->second, session->caps); reconnected_caps.insert(in); continue; } @@ -366,8 +366,8 @@ void Server::process_reconnected_caps() int issued = in->get_caps_issued(); if (in->is_auth()) { // wr? - if (issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER)) { - if (issued & (CAP_FILE_RDCACHE|CAP_FILE_WRBUFFER)) { + if (issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) { + if (issued & (CEPH_CAP_RDCACHE|CEPH_CAP_WRBUFFER)) { in->filelock.set_state(LOCK_LONER); } else { in->filelock.set_state(LOCK_MIXED); @@ -375,7 +375,7 @@ void Server::process_reconnected_caps() } } else { // note that client should perform stale/reap cleanup during reconnect. - assert(issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER) == 0); // ???? + assert(issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER) == 0); // ???? if (in->filelock.is_xlocked()) in->filelock.set_state(LOCK_LOCK); else @@ -3786,7 +3786,7 @@ void Server::_do_open(MDRequest *mdr, CInode *cur) // can we issue the caps they want? //version_t fdv = mds->locker->issue_file_data_version(cur); - Capability *cap = mds->locker->issue_new_caps(cur, cmode, req); + Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session); if (!cap) return; // can't issue (yet), so wait! dout(12) << "_do_open issuing caps " << cap_string(cap->pending()) diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index 61e0cfe297712..f864e8f834287 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -26,6 +26,7 @@ using __gnu_cxx::hash_map; #include "mdstypes.h" class CInode; +class Capability; /* * session @@ -43,8 +44,8 @@ public: int state; utime_t last_alive; // last alive - xlist::item xlist_item; entity_inst_t inst; + xlist::item session_list_item; bool is_opening() { return state == STATE_OPENING; } bool is_open() { return state == STATE_OPEN; } @@ -52,8 +53,9 @@ public: // -- caps -- private: - version_t cap_push_seq; // cap push seq # - xlist cap_inodes; // inodes with caps; front=most recently used + version_t cap_push_seq; // cap push seq # +public: + xlist caps; // inodes with caps; front=most recently used public: version_t inc_push_seq() { return ++cap_push_seq; } @@ -93,7 +95,7 @@ public: Session() : state(STATE_UNDEF), - xlist_item(this), + session_list_item(this), cap_push_seq(0) { } void _encode(bufferlist& bl) const { @@ -150,7 +152,7 @@ public: session_map.erase(s->inst.name); } void touch_session(Session *s) { - session_list.push_back(&s->xlist_item); + session_list.push_back(&s->session_list_item); } void get_client_set(set& s) { diff --git a/src/messages/MClientFileCaps.h b/src/messages/MClientFileCaps.h index dfeca48a1217c..d3cfdbd2a0ef3 100644 --- a/src/messages/MClientFileCaps.h +++ b/src/messages/MClientFileCaps.h @@ -16,7 +16,6 @@ #define __MCLIENTFILECAPS_H #include "msg/Message.h" -#include "mds/Capability.h" class MClientFileCaps : public Message { diff --git a/src/messages/MPing.h b/src/messages/MPing.h index 7a14800d85178..7ff3af49e1df8 100644 --- a/src/messages/MPing.h +++ b/src/messages/MPing.h @@ -18,23 +18,26 @@ #define __MPING_H #include "msg/Message.h" - +#include "include/encodable.h" class MPing : public Message { public: - int seq; - MPing(int s) : Message(CEPH_MSG_PING) { + __u64 seq; + utime_t stamp; + MPing(int s, utime_t w) : Message(CEPH_MSG_PING) { seq = s; + stamp = w; } MPing() : Message(CEPH_MSG_PING) {} void decode_payload() { - int off = 0; - payload.copy(0, sizeof(seq), (char*)&seq); - off += sizeof(seq); + bufferlist::iterator p = payload.begin(); + ::_decode_simple(seq, p); + ::_decode_simple(stamp, p); } void encode_payload() { - payload.append((char*)&seq, sizeof(seq)); + ::_encode_simple(seq, payload); + ::_encode_simple(stamp, payload); } const char *get_type_name() { return "ping"; } diff --git a/src/messages/MPingAck.h b/src/messages/MPingAck.h index 1f41b7fcd98b5..583147b708a4b 100644 --- a/src/messages/MPingAck.h +++ b/src/messages/MPingAck.h @@ -21,19 +21,27 @@ class MPingAck : public Message { public: - int seq; + __u64 seq; + utime_t sender_stamp; + utime_t reply_stamp; + MPingAck() {} - MPingAck(MPing *p) : Message(CEPH_MSG_PING_ACK) { - this->seq = p->seq; + MPingAck(MPing *p, utime_t w) : Message(CEPH_MSG_PING_ACK) { + seq = p->seq; + sender_stamp = p->stamp; + reply_stamp = w; } void decode_payload() { - int off = 0; - payload.copy(0, sizeof(seq), (char*)&seq); - off += sizeof(seq); + bufferlist::iterator p = payload.begin(); + ::_decode_simple(seq, p); + ::_decode_simple(sender_stamp, p); + ::_decode_simple(reply_stamp, p); } void encode_payload() { - payload.append((char*)&seq, sizeof(seq)); + ::_encode_simple(seq, payload); + ::_encode_simple(sender_stamp, payload); + ::_encode_simple(reply_stamp, payload); } const char *get_type_name() { return "pinga"; } -- 2.39.5