From: Sage Weil Date: Thu, 15 May 2008 02:26:07 +0000 (-0700) Subject: tearing up client cap code X-Git-Tag: v0.3~209^2~49^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1c4343f3217a30a2ced8fa0c781a66b9bdb88813;p=ceph.git tearing up client cap code --- diff --git a/src/client/Client.cc b/src/client/Client.cc index a7723312d7e..9c5cc3b203b 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -317,7 +317,7 @@ void Client::update_inode(Inode *in, InodeStat *st, LeaseStat *lease, utime_t fr dout(12) << "update_inode mask " << lease->mask << " ttl " << ttl << dendl; if (lease->mask & CEPH_STAT_MASK_INODE) { - int issued = in->file_caps(); + int issued = in->caps_issued(); in->inode.ino = st->ino; in->inode.layout = st->layout; @@ -1150,13 +1150,13 @@ void Client::send_reconnect(int mds) p++) { if (p->second->caps.count(mds)) { dout(10) << " caps on " << p->first - << " " << cap_string(p->second->caps[mds].caps) - << " wants " << cap_string(p->second->file_caps_wanted()) + << " " << cap_string(p->second->caps[mds].issued) + << " wants " << cap_string(p->second->caps_wanted()) << dendl; p->second->caps[mds].seq = 0; // reset seq. m->add_inode_caps(p->first, // ino - p->second->file_caps_wanted(), // wanted - p->second->caps[mds].caps, // issued + p->second->caps_wanted(), // wanted + p->second->caps[mds].issued, // issued p->second->inode.size, p->second->inode.mtime, p->second->inode.atime); filepath path; p->second->make_path(path); @@ -1273,6 +1273,112 @@ void Client::release_lease(Inode *in, Dentry *dn, int mask) * caps */ + + +void Inode::get_cap_ref(int cap) +{ + int n = 0; + while (cap) { + if (cap & 1) { + int c = 1 << n; + cap_refs[c]++; + cout << "inode " << inode.ino << " get " << cap_string(c) << " " + << (cap_refs[c]-1) << " -> " << cap_refs[c] << std::endl; + } + cap >>= 1; + n++; + } +} + +bool Inode::put_cap_ref(int cap) +{ + bool last; + int n = 0; + while (cap) { + if (cap & 1) { + int c = 1 << n; + if (--cap_refs[c] == 0) + last = true; + cout << "inode " << inode.ino << " put " << cap_string(c) << " " + << (cap_refs[c]+1) << " -> " << cap_refs[c] << std::endl; + } + cap >>= 1; + n++; + } + return last; +} + +void Client::put_cap_ref(Inode *in, int cap) +{ + if (in->put_cap_ref(cap)) + check_caps(in); +} + +void Client::check_caps(Inode *in) +{ + int wanted = in->caps_wanted(); + int used = in->caps_used(); + + dout(10) << "check_caps on " << in->inode.ino + << " wanted " << cap_string(wanted) + << " used " << cap_string(used) + << dendl; + + for (map::iterator it = in->caps.begin(); + it != in->caps.end(); + it++) { + InodeCap &cap = it->second; + int revoking = cap.implemented & ~cap.issued; + + if (in->wanted_max_size > in->inode.max_size && + in->wanted_max_size > in->requested_max_size) + goto ack; + + /* completed revocation? */ + if (revoking && (revoking && used) == 0) { + dout(10) << "completed revocation of " << (cap.implemented & ~cap.issued) << dendl; + goto ack; + } + + /* approaching file_max? */ + if ((cap.issued & CEPH_CAP_WR) && + (in->inode.size << 1) >= in->inode.max_size) { + //(in->i_reported_size << 1) < in->inode.max_size) { + dout(10) << "size approaching max_size" << dendl; + goto ack; + } + + if ((cap.issued & ~wanted) == 0) + continue; /* nothing extra, all good */ + + /* + if (time_before(jiffies, ci->i_hold_caps_until)) { + // delaying cap release for a bit + dout(30, "delaying cap release\n"); + continue; + } + */ + + ack: + MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK, + in->inode, + it->second.seq, + it->second.issued, + wanted); + m->set_max_size(in->wanted_max_size); + in->requested_max_size = in->wanted_max_size; + messenger->send_message(m, mdsmap->get_inst(it->first)); + if (wanted == 0) + mds_sessions[it->first].num_caps--; + } + + if (wanted == 0 && !in->caps.empty()) { + in->caps.clear(); + put_inode(in); + } +} + + class C_Client_ImplementedCaps : public Context { Client *client; MClientFileCaps *msg; @@ -1284,6 +1390,15 @@ public: } }; + +void signal_cond_list(list& ls) +{ + for (list::iterator it = ls.begin(); it != ls.end(); it++) + (*it)->Signal(); + ls.clear(); +} + + /** handle_file_caps * handle caps update from mds. including mds to mds caps transitions. * do not block. @@ -1335,7 +1450,7 @@ void Client::handle_file_caps(MClientFileCaps *m) mds_sessions[mds].num_caps++; if (in->caps.empty()) in->get(); in->caps[mds].seq = m->get_seq(); - in->caps[mds].caps = m->get_caps(); + in->caps[mds].issued = m->get_caps(); } assert(in->stale_caps.count(other)); @@ -1410,9 +1525,12 @@ void Client::handle_file_caps(MClientFileCaps *m) delete m; return; } + + InodeCap &cap = in->caps[mds]; + // don't want? - int wanted = in->file_caps_wanted(); + int wanted = in->caps_wanted(); if (wanted == 0) { dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() @@ -1425,11 +1543,11 @@ void Client::handle_file_caps(MClientFileCaps *m) return; } + int used = in->caps_used(); + // update per-mds caps - const int old_caps = in->caps[mds].caps; + const int old_caps = cap.issued; const int new_caps = m->get_caps(); - in->caps[mds].caps = new_caps; - in->caps[mds].seq = m->get_seq(); dout(5) << "handle_file_caps on in " << m->get_ino() << " mds" << mds << " seq " << m->get_seq() << " caps now " << cap_string(new_caps) @@ -1446,6 +1564,7 @@ void Client::handle_file_caps(MClientFileCaps *m) in->inode.time_warp_seq = m->get_time_warp_seq(); } + bool kick_writers = false; if (m->get_max_size() != in->inode.max_size) { dout(10) << "max_size " << in->inode.max_size << " -> " << m->get_max_size() << dendl; in->inode.max_size = m->get_max_size(); @@ -1453,71 +1572,56 @@ void Client::handle_file_caps(MClientFileCaps *m) in->wanted_max_size = 0; in->requested_max_size = 0; } + kick_writers = true; + } + + // update caps + cap.seq = m->get_seq(); + + bool ack = false; + bool invalidate = false; + bool writeback = false; + + if (old_caps & ~new_caps) { + dout(10) << " revocation of " << cap_string(~new_caps & old_caps) << dendl; + cap.issued = new_caps; + + if ((cap.issued & ~new_caps) & CEPH_CAP_RDCACHE) + invalidate = true; + if ((used & ~new_caps) & CEPH_CAP_WRBUFFER) + writeback = true; + else { + ack = true; + cap.implemented = new_caps; + + // share our (possibly newer) file size, mtime, atime + m->set_size(in->inode.size); + m->set_max_size(0); // dont re-request + m->set_mtime(in->inode.mtime); + m->set_atime(in->inode.atime); + m->set_wanted(wanted); + } + } else if (old_caps == new_caps) { + dout(10) << " caps unchanged at " << cap_string(old_caps) << dendl; + } else { + dout(10) << " grant, new caps are " << cap_string(new_caps & ~old_caps) << dendl; + cap.issued = cap.implemented = new_caps; } - // share our (possibly newer) file size, mtime, atime - m->set_size(in->inode.size); - m->set_mtime(in->inode.mtime); - m->set_atime(in->inode.atime); + // wake up waiters + if (new_caps & CEPH_CAP_RD) signal_cond_list(in->waitfor_read); + if ((new_caps & CEPH_CAP_WR) || kick_writers) signal_cond_list(in->waitfor_write); + if ((new_caps & CEPH_CAP_LAZYIO) || kick_writers) signal_cond_list(in->waitfor_lazy); - if (g_conf.client_oc) { - // caching on, use FileCache. - Context *onimplement = 0; - if (old_caps & ~new_caps) { // this mds is revoking caps - if (in->fc.get_caps() & ~(in->file_caps())) // net revocation - onimplement = new C_Client_ImplementedCaps(this, m, in); - else { - implemented_caps(m, in); // ack now. - } - } - in->fc.set_caps(new_caps, onimplement); - } else { - // caching off. + if (ack) + messenger->send_message(m, m->get_source_inst()); - // wake up waiters? - if (new_caps & CEPH_CAP_RD) { - for (list::iterator it = in->waitfor_read.begin(); - it != in->waitfor_read.end(); - it++) { - dout(5) << "signaling read waiter " << *it << dendl; - (*it)->Signal(); - } - in->waitfor_read.clear(); - } - if (new_caps & CEPH_CAP_WR) { - for (list::iterator it = in->waitfor_write.begin(); - it != in->waitfor_write.end(); - it++) { - dout(5) << "signaling write waiter " << *it << dendl; - (*it)->Signal(); - } - in->waitfor_write.clear(); - } - if (new_caps & CEPH_CAP_LAZYIO) { - for (list::iterator it = in->waitfor_lazy.begin(); - it != in->waitfor_lazy.end(); - it++) { - dout(5) << "signaling lazy waiter " << *it << dendl; - (*it)->Signal(); - } - in->waitfor_lazy.clear(); - } + if (invalidate) + in->fc.release_clean(); + + if (writeback) + in->fc.flush_dirty(); - // ack? - if (old_caps & ~new_caps) { - if (in->sync_writes) { - // wait for sync writes to finish - dout(5) << "sync writes in progress, will ack on finish" << dendl; - in->waitfor_no_write.push_back(new C_Client_ImplementedCaps(this, m, in)); - } else { - // ok now - implemented_caps(m, in); - } - } else { - // discard - delete m; - } - } } void Client::implemented_caps(MClientFileCaps *m, Inode *in) @@ -1529,69 +1633,6 @@ void Client::implemented_caps(MClientFileCaps *m, Inode *in) } -void Client::release_caps(Inode *in, - int retain) -{ - int wanted = in->file_caps_wanted(); - dout(5) << "releasing caps on ino " << in->inode.ino << dec - << " had " << cap_string(in->file_caps()) - << " retaining " << cap_string(retain) - << " want " << cap_string(wanted) - << dendl; - - for (map::iterator it = in->caps.begin(); - it != in->caps.end(); - it++) { - //if (it->second.caps & ~retain) { - if (1) { - // release (some of?) these caps - it->second.caps = retain & it->second.caps; - // note: tell mds _full_ wanted; it'll filter/behave based on what it is allowed to do - MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK, - in->inode, - it->second.seq, - it->second.caps, - wanted); - messenger->send_message(m, mdsmap->get_inst(it->first)); - } - if (wanted == 0) - mds_sessions[it->first].num_caps--; - } - if (wanted == 0 && !in->caps.empty()) { - in->caps.clear(); - put_inode(in); - } -} - -void Client::update_caps_wanted(Inode *in) -{ - int wanted = in->file_caps_wanted(); - dout(5) << "updating caps wanted on ino " << in->inode.ino - << " to " << cap_string(wanted) - << " max_size " << in->wanted_max_size - << dendl; - - // FIXME: pick a single mds and let the others off the hook.. - for (map::iterator it = in->caps.begin(); - it != in->caps.end(); - it++) { - MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK, - in->inode, - it->second.seq, - it->second.caps, - wanted); - m->set_max_size(in->wanted_max_size); - in->requested_max_size = in->wanted_max_size; - messenger->send_message(m, mdsmap->get_inst(it->first)); - if (wanted == 0) - mds_sessions[it->first].num_caps--; - } - if (wanted == 0 && !in->caps.empty()) { - in->caps.clear(); - put_inode(in); - } -} - // ------------------- @@ -1746,7 +1787,7 @@ int Client::unmount() in->fc.empty(new C_Client_CloseRelease(this, in)); } else { dout(10) << "unmount residual caps on " << in->ino() << ", releasing" << dendl; - release_caps(in); + check_caps(in); } } } @@ -2295,7 +2336,7 @@ int Client::_utimes(const filepath &path, utime_t mtime, utime_t atime, bool fol Dentry *dn = lookup(path); int want = CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL; if (dn && dn->inode && - (dn->inode->file_caps() & want) == want) { + (dn->inode->caps_issued() & want) == want) { dout(5) << " have WR and EXCL caps, just updating our m/atime" << dendl; dn->inode->inode.time_warp_seq++; dn->inode->inode.mtime = mtime; @@ -2714,7 +2755,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui Inode *in = 0; if (dn) { in = dn->inode; - in->add_open(cmode); // make note of pending open, since it effects _wanted_ caps. + in->get_open_ref(cmode); // make note of pending open, since it effects _wanted_ caps. } in = 0; @@ -2737,7 +2778,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui f->inode = in; f->inode->get(); if (!dn) - in->add_open(f->mode); // i may have alrady added it above! + in->get_open_ref(f->mode); // i may have alrady added it above! dout(10) << in->inode.ino << " wr " << in->num_open_wr << " rd " << in->num_open_rd << " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << dendl; @@ -2753,11 +2794,11 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui if (in->caps.count(mds) == 0) mds_sessions[mds].num_caps++; - int new_caps = reply->get_file_caps(); + int new_caps = reply->get_caps_issued(); - assert(reply->get_file_caps_seq() >= in->caps[mds].seq); - if (reply->get_file_caps_seq() > in->caps[mds].seq) { - int old_caps = in->caps[mds].caps; + assert(reply->get_file_caps_seq() >= cap.seq); + if (reply->get_file_caps_seq() > cap.seq) { + int old_caps = cap.caps; dout(7) << "open got caps " << cap_string(new_caps) << " (had " << cap_string(old_caps) << ")" @@ -2766,12 +2807,12 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui << " from mds" << mds << dendl; - in->caps[mds].caps = new_caps; - in->caps[mds].seq = reply->get_file_caps_seq(); + cap.caps = new_caps; + cap.seq = reply->get_file_caps_seq(); // we shouldn't ever lose caps at this point. // actually, we might...? - assert((old_caps & ~in->caps[mds].caps) == 0); + assert((old_caps & ~cap.caps) == 0); if (g_conf.client_oc) in->fc.set_caps(new_caps); @@ -2784,7 +2825,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui << dendl; } - dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->file_caps()) << dendl; + dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->caps_issued()) << dendl; } delete reply; @@ -2846,15 +2887,15 @@ int Client::_release(Fh *f) Inode *in = f->inode; // update inode rd/wr counts - int before = in->file_caps_wanted(); - in->sub_open(f->mode); - int after = in->file_caps_wanted(); + int before = in->caps_wanted(); + in->put_open_ref(f->mode); + int after = in->caps_wanted(); delete f; // does this change what caps we want? if (before != after && after) - update_caps_wanted(in); + check_caps(in); // release caps right away? dout(10) << "num_open_rd " << in->num_open_rd << " num_open_wr " << in->num_open_wr << dendl; @@ -2998,7 +3039,7 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl) if (lazy) { // wait for lazy cap - if ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) { + if ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3007,14 +3048,14 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl) } } else { // wait for RD cap? - while ((in->file_caps() & CEPH_CAP_RD) == 0) { + while ((in->caps_issued() & CEPH_CAP_RD) == 0) { dout(7) << " don't have read cap, waiting" << dendl; goto wait; } } // async i/o? - if ((in->file_caps() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) { + if ((in->caps_issued() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) { // FIXME: this logic needs to move info FileCache! @@ -3109,14 +3150,7 @@ void Client::sync_write_commit(Inode *in) unsafe_sync_write--; in->uncommitted_writes--; - if (in->uncommitted_writes == 0) { - dout(15) << "sync_write_commit all sync writes committed on ino " << in->inode.ino << dendl; - for (list::iterator p = in->waitfor_commit.begin(); - p != in->waitfor_commit.end(); - p++) - (*p)->Signal(); - in->waitfor_commit.clear(); - } + cl->put_cap_ref(in, CEPH_CAP_WRBUFFER); dout(15) << "sync_write_commit unsafe_sync_write = " << unsafe_sync_write << dendl; if (unsafe_sync_write == 0 && unmounting) { @@ -3177,80 +3211,54 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf) bufferlist blist; blist.push_back( bp ); - if (g_conf.client_oc) { // buffer cache ON? - assert(objectcacher); - - // write (this may block!) - in->fc.write(offset, size, blist, client_lock); - - } else { - // legacy, inconsistent synchronous write. - dout(7) << "synchronous write" << dendl; - - // do we have write file cap? - __u64 endoff = offset + size; - - if ((endoff >= in->inode.max_size || - endoff > (in->inode.size << 1)) && - endoff > in->wanted_max_size) { - dout(10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl; - in->wanted_max_size = endoff; - update_caps_wanted(in); - } - - while (!lazy && - ((in->file_caps() & CEPH_CAP_WR) == 0 || - endoff > in->inode.max_size)) { - dout(7) << " don't have write cap for endoff " << endoff - << " (max " << in->inode.max_size << "), waiting" << dendl; - Cond cond; - in->waitfor_write.push_back(&cond); - cond.Wait(client_lock); - } - while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) { - dout(7) << " don't have lazy cap, waiting" << dendl; - Cond cond; + // request larger max_size? + __u64 endoff = offset + size; + if ((endoff >= in->inode.max_size || + endoff > (in->inode.size << 1)) && + endoff > in->wanted_max_size) { + dout(10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl; + in->wanted_max_size = endoff; + check_caps(in); + } + + // wait for caps, max_size + while ((lazy && (in->caps_issued() & CEPH_CAP_LAZYIO) == 0) || + (!lazy && (in->caps_issued() & CEPH_CAP_WR) == 0) || + endoff > in->inode.max_size) { + dout(7) << "missing wr|lazy cap OR endoff " << endoff + << " > max_size " << in->inode.max_size + << ", waiting" << dendl; + Cond cond; + if (lazy) in->waitfor_lazy.push_back(&cond); - cond.Wait(client_lock); - } + else + in->waitfor_write.push_back(&cond); + cond.Wait(client_lock); + } - // avoid livelock with fsync - if (in->uncommitted_writes > 0 && - !in->waitfor_commit.empty()) - _fsync(f, true); + in->get_cap_ref(CEPH_CAP_WR); - // prepare write + // avoid livelock with fsync? + + if (g_conf.client_oc) { + // buffer cache + in->fc.write(offset, size, blist, client_lock); + } else { + // simple, non-atomic sync write Cond cond; bool done = false; Context *onfinish = new C_Cond(&cond, &done); - in->get(); Context *onsafe = new C_Client_SyncCommit(this, in); + unsafe_sync_write++; - in->sync_writes++; - - dout(20) << " sync write start " << onfinish << dendl; + in->get(); + in->get_cap_ref(CEPH_CAP_WRBUFFER); filer->write(in->inode, offset, size, blist, 0, - onfinish, onsafe - //, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots - ); + onfinish, onsafe); - while (!done) { + while (!done) cond.Wait(client_lock); - dout(20) << " sync write bump " << onfinish << dendl; - } - - in->sync_writes--; - if (in->sync_writes == 0 && - !in->waitfor_no_write.empty()) { - for (list::iterator i = in->waitfor_no_write.begin(); - i != in->waitfor_no_write.end(); - i++) - (*i)->finish(0); - in->waitfor_no_write.clear(); - } - - dout(20) << " sync write done " << onfinish << dendl; } // time @@ -3275,6 +3283,8 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf) // mtime in->inode.mtime = g_clock.real_now(); + put_cap_ref(in, CEPH_CAP_WR); + // ok! return totalwritten; } @@ -3482,7 +3492,7 @@ int Client::lazyio_propogate(int fd, off_t offset, size_t count) if (f->mode & CEPH_FILE_MODE_LAZY) { // wait for lazy cap - while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) { + while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3518,7 +3528,7 @@ int Client::lazyio_synchronize(int fd, off_t offset, size_t count) if (f->mode & CEPH_FILE_MODE_LAZY) { // wait for lazy cap - while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) { + while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); diff --git a/src/client/Client.h b/src/client/Client.h index 0a9f28f6346..561c2d359ab 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -127,9 +127,10 @@ class Dir { class InodeCap { public: - int caps; + unsigned issued; + unsigned implemented; unsigned seq; - InodeCap() : caps(0), seq(0) {} + InodeCap() : issued(0), implemented(0), seq(0) {} }; @@ -148,7 +149,9 @@ class Inode { map caps; // mds -> InodeCap map stale_caps; // mds -> cap .. stale - int num_open_rd, num_open_wr, num_open_lazy; // num readers, writers + int open_by_mode[CEPH_FILE_MODE_NUM]; + map cap_refs; + __u64 wanted_max_size, requested_max_size; int ref; // ref count. 1 for each dentry, fh that links to me. @@ -162,10 +165,6 @@ class Inode { // for caching i/o mode FileCache fc; - // for sync i/o mode - int sync_reads; // sync reads in progress - int sync_writes; // sync writes in progress - int uncommitted_writes; // sync writes missing commits list waitfor_write; list waitfor_read; @@ -207,14 +206,13 @@ class Inode { //inode(_inode), lease_mask(0), lease_mds(-1), dir_auth(-1), dir_hashed(false), dir_replicated(false), - num_open_rd(0), num_open_wr(0), num_open_lazy(0), wanted_max_size(0), requested_max_size(0), ref(0), ll_ref(0), dir(0), dn(0), symlink(0), fc(_oc, ino, layout), - sync_reads(0), sync_writes(0), uncommitted_writes(0), hack_balance_reads(false) { + memset(open_by_mode, 0, sizeof(int)*CEPH_FILE_MODE_NUM); inode.ino = ino; } ~Inode() { @@ -225,34 +223,60 @@ class Inode { bool is_dir() { return inode.is_dir(); } - int file_caps() { + + // CAPS -------- + void get_open_ref(int mode) { + open_by_mode[mode]++; + } + bool put_open_ref(int mode) { + if (--open_by_mode[mode] == 0) + return true; + return false; + } + + void get_cap_ref(int cap); + bool put_cap_ref(int cap); + + int caps_issued() { int c = 0; for (map::iterator it = caps.begin(); it != caps.end(); it++) - c |= it->second.caps; + c |= it->second.issued; for (map::iterator it = stale_caps.begin(); it != stale_caps.end(); it++) - c |= it->second.caps; + c |= it->second.issued; return c; } - int file_caps_wanted() { + int caps_used() { int w = 0; - if (num_open_rd) w |= CEPH_CAP_RD|CEPH_CAP_RDCACHE; - if (num_open_wr) w |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL; - if (num_open_lazy) w |= CEPH_CAP_LAZYIO; - if (fc.is_dirty()) w |= CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL; - if (fc.is_cached()) w |= CEPH_CAP_RDCACHE; + for (map::iterator p = cap_refs.begin(); + p != cap_refs.end(); + p++) + w |= p->first; return w; } + int caps_file_wanted() { + int want = 0; + for (int mode = 0; mode < 4; mode++) + if (open_by_mode[mode]) + want |= ceph_caps_for_mode(mode); + return want; + } + int caps_wanted() { + int want = caps_file_wanted() | caps_used(); + if (want & CEPH_CAP_WRBUFFER) + want |= CEPH_CAP_EXCL; + return want; + } int get_effective_lease_mask(utime_t now) { int havemask = 0; if (now < lease_ttl && lease_mds >= 0) havemask |= lease_mask; - if (file_caps() & CEPH_CAP_EXCL) + if (caps_issued() & CEPH_CAP_EXCL) havemask |= CEPH_LOCK_ICONTENT; if (havemask & CEPH_LOCK_ICONTENT) havemask |= CEPH_LOCK_ICONTENT; // hack: if we have one, we have both, for the purposes of below @@ -261,9 +285,9 @@ class Inode { bool have_valid_size() { // RD+RDCACHE or WR+WRBUFFER => valid size - if ((file_caps() & (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) == (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) + if ((caps_issued() & (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) == (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) return true; - if ((file_caps() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) + if ((caps_issued() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) return true; // otherwise, look for lease or EXCL... if (get_effective_lease_mask(g_clock.now()) & CEPH_LOCK_ICONTENT) @@ -271,17 +295,7 @@ class Inode { return false; } - void add_open(int cmode) { - if (cmode & CEPH_FILE_MODE_RD) num_open_rd++; - if (cmode & CEPH_FILE_MODE_WR) num_open_wr++; - if (cmode & CEPH_FILE_MODE_LAZY) num_open_lazy++; - } - void sub_open(int cmode) { - if (cmode & CEPH_FILE_MODE_RD) num_open_rd--; - if (cmode & CEPH_FILE_MODE_WR) num_open_wr--; - if (cmode & CEPH_FILE_MODE_LAZY) num_open_lazy--; - } - + int authority(const string& dname) { if (!dirfragtree.empty()) { __gnu_cxx::hash H; @@ -728,9 +742,14 @@ protected: // file caps void handle_file_caps(class MClientFileCaps *m); + void check_caps(Inode *in); + void put_cap_ref(Inode *in, int cap); + + void implemented_caps(class MClientFileCaps *m, Inode *in); void release_caps(Inode *in, int retain=0); - void update_caps_wanted(Inode *in); + + void close_release(Inode *in); void close_safe(Inode *in); diff --git a/src/client/FileCache.cc b/src/client/FileCache.cc index 3f6a345f592..ff0c38c7a33 100644 --- a/src/client/FileCache.cc +++ b/src/client/FileCache.cc @@ -220,15 +220,6 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock) { - // can i write - while ((latest_caps & CEPH_CAP_WR) == 0) { - dout(10) << "write doesn't have WR cap, blocking" << dendl; - Cond c; - waitfor_write.insert(&c); - c.Wait(client_lock); - waitfor_write.erase(&c); - } - // inc writing counter num_writing++; diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 2b6e67dfc9a..4f611298f93 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -915,8 +915,9 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino) // blocking wait for write. -void ObjectCacher::wait_for_write(size_t len, Mutex& lock) +bool ObjectCacher::wait_for_write(size_t len, Mutex& lock) { + int blocked = 0; while (get_stat_dirty() + get_stat_tx() >= g_conf.client_oc_max_dirty) { dout(10) << "wait_for_write waiting on " << len << ", dirty|tx " << (get_stat_dirty() + get_stat_tx()) @@ -926,8 +927,10 @@ void ObjectCacher::wait_for_write(size_t len, Mutex& lock) stat_waiter++; stat_cond.Wait(lock); stat_waiter--; + blocked++; dout(10) << "wait_for_write woke up" << dendl; } + return blocked; } void ObjectCacher::flusher_entry() diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index 196687f9209..2f45ba7c903 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -456,7 +456,7 @@ class ObjectCacher { int writex(Objecter::OSDWrite *wr, inodeno_t ino); // write blocking - void wait_for_write(size_t len, Mutex& lock); + bool wait_for_write(size_t len, Mutex& lock); // blocking. atomic+sync. int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock);