dout(12) << "update_inode mask " << lease->mask << " ttl " << ttl << dendl;
if (lease->mask & CEPH_STAT_MASK_INODE) {
- int issued = in->file_caps();
+ int issued = in->caps_issued();
in->inode.ino = st->ino;
in->inode.layout = st->layout;
p++) {
if (p->second->caps.count(mds)) {
dout(10) << " caps on " << p->first
- << " " << cap_string(p->second->caps[mds].caps)
- << " wants " << cap_string(p->second->file_caps_wanted())
+ << " " << cap_string(p->second->caps[mds].issued)
+ << " wants " << cap_string(p->second->caps_wanted())
<< dendl;
p->second->caps[mds].seq = 0; // reset seq.
m->add_inode_caps(p->first, // ino
- p->second->file_caps_wanted(), // wanted
- p->second->caps[mds].caps, // issued
+ p->second->caps_wanted(), // wanted
+ p->second->caps[mds].issued, // issued
p->second->inode.size, p->second->inode.mtime, p->second->inode.atime);
filepath path;
p->second->make_path(path);
* caps
*/
+
+
+void Inode::get_cap_ref(int cap)
+{
+ int n = 0;
+ while (cap) {
+ if (cap & 1) {
+ int c = 1 << n;
+ cap_refs[c]++;
+ cout << "inode " << inode.ino << " get " << cap_string(c) << " "
+ << (cap_refs[c]-1) << " -> " << cap_refs[c] << std::endl;
+ }
+ cap >>= 1;
+ n++;
+ }
+}
+
+bool Inode::put_cap_ref(int cap)
+{
+ bool last;
+ int n = 0;
+ while (cap) {
+ if (cap & 1) {
+ int c = 1 << n;
+ if (--cap_refs[c] == 0)
+ last = true;
+ cout << "inode " << inode.ino << " put " << cap_string(c) << " "
+ << (cap_refs[c]+1) << " -> " << cap_refs[c] << std::endl;
+ }
+ cap >>= 1;
+ n++;
+ }
+ return last;
+}
+
+void Client::put_cap_ref(Inode *in, int cap)
+{
+ if (in->put_cap_ref(cap))
+ check_caps(in);
+}
+
+void Client::check_caps(Inode *in)
+{
+ int wanted = in->caps_wanted();
+ int used = in->caps_used();
+
+ dout(10) << "check_caps on " << in->inode.ino
+ << " wanted " << cap_string(wanted)
+ << " used " << cap_string(used)
+ << dendl;
+
+ for (map<int,InodeCap>::iterator it = in->caps.begin();
+ it != in->caps.end();
+ it++) {
+ InodeCap &cap = it->second;
+ int revoking = cap.implemented & ~cap.issued;
+
+ if (in->wanted_max_size > in->inode.max_size &&
+ in->wanted_max_size > in->requested_max_size)
+ goto ack;
+
+ /* completed revocation? */
+ if (revoking && (revoking && used) == 0) {
+ dout(10) << "completed revocation of " << (cap.implemented & ~cap.issued) << dendl;
+ goto ack;
+ }
+
+ /* approaching file_max? */
+ if ((cap.issued & CEPH_CAP_WR) &&
+ (in->inode.size << 1) >= in->inode.max_size) {
+ //(in->i_reported_size << 1) < in->inode.max_size) {
+ dout(10) << "size approaching max_size" << dendl;
+ goto ack;
+ }
+
+ if ((cap.issued & ~wanted) == 0)
+ continue; /* nothing extra, all good */
+
+ /*
+ if (time_before(jiffies, ci->i_hold_caps_until)) {
+ // delaying cap release for a bit
+ dout(30, "delaying cap release\n");
+ continue;
+ }
+ */
+
+ ack:
+ MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK,
+ in->inode,
+ it->second.seq,
+ it->second.issued,
+ wanted);
+ m->set_max_size(in->wanted_max_size);
+ in->requested_max_size = in->wanted_max_size;
+ messenger->send_message(m, mdsmap->get_inst(it->first));
+ if (wanted == 0)
+ mds_sessions[it->first].num_caps--;
+ }
+
+ if (wanted == 0 && !in->caps.empty()) {
+ in->caps.clear();
+ put_inode(in);
+ }
+}
+
+
class C_Client_ImplementedCaps : public Context {
Client *client;
MClientFileCaps *msg;
}
};
+
+void signal_cond_list(list<Cond*>& ls)
+{
+ for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); it++)
+ (*it)->Signal();
+ ls.clear();
+}
+
+
/** handle_file_caps
* handle caps update from mds. including mds to mds caps transitions.
* do not block.
mds_sessions[mds].num_caps++;
if (in->caps.empty()) in->get();
in->caps[mds].seq = m->get_seq();
- in->caps[mds].caps = m->get_caps();
+ in->caps[mds].issued = m->get_caps();
}
assert(in->stale_caps.count(other));
delete m;
return;
}
+
+ InodeCap &cap = in->caps[mds];
+
// don't want?
- int wanted = in->file_caps_wanted();
+ int wanted = in->caps_wanted();
if (wanted == 0) {
dout(5) << "handle_file_caps on ino " << m->get_ino()
<< " seq " << m->get_seq()
return;
}
+ int used = in->caps_used();
+
// update per-mds caps
- const int old_caps = in->caps[mds].caps;
+ const int old_caps = cap.issued;
const int new_caps = m->get_caps();
- in->caps[mds].caps = new_caps;
- in->caps[mds].seq = m->get_seq();
dout(5) << "handle_file_caps on in " << m->get_ino()
<< " mds" << mds << " seq " << m->get_seq()
<< " caps now " << cap_string(new_caps)
in->inode.time_warp_seq = m->get_time_warp_seq();
}
+ bool kick_writers = false;
if (m->get_max_size() != in->inode.max_size) {
dout(10) << "max_size " << in->inode.max_size << " -> " << m->get_max_size() << dendl;
in->inode.max_size = m->get_max_size();
in->wanted_max_size = 0;
in->requested_max_size = 0;
}
+ kick_writers = true;
+ }
+
+ // update caps
+ cap.seq = m->get_seq();
+
+ bool ack = false;
+ bool invalidate = false;
+ bool writeback = false;
+
+ if (old_caps & ~new_caps) {
+ dout(10) << " revocation of " << cap_string(~new_caps & old_caps) << dendl;
+ cap.issued = new_caps;
+
+ if ((cap.issued & ~new_caps) & CEPH_CAP_RDCACHE)
+ invalidate = true;
+ if ((used & ~new_caps) & CEPH_CAP_WRBUFFER)
+ writeback = true;
+ else {
+ ack = true;
+ cap.implemented = new_caps;
+
+ // share our (possibly newer) file size, mtime, atime
+ m->set_size(in->inode.size);
+ m->set_max_size(0); // dont re-request
+ m->set_mtime(in->inode.mtime);
+ m->set_atime(in->inode.atime);
+ m->set_wanted(wanted);
+ }
+ } else if (old_caps == new_caps) {
+ dout(10) << " caps unchanged at " << cap_string(old_caps) << dendl;
+ } else {
+ dout(10) << " grant, new caps are " << cap_string(new_caps & ~old_caps) << dendl;
+ cap.issued = cap.implemented = new_caps;
}
- // share our (possibly newer) file size, mtime, atime
- m->set_size(in->inode.size);
- m->set_mtime(in->inode.mtime);
- m->set_atime(in->inode.atime);
+ // wake up waiters
+ if (new_caps & CEPH_CAP_RD) signal_cond_list(in->waitfor_read);
+ if ((new_caps & CEPH_CAP_WR) || kick_writers) signal_cond_list(in->waitfor_write);
+ if ((new_caps & CEPH_CAP_LAZYIO) || kick_writers) signal_cond_list(in->waitfor_lazy);
- if (g_conf.client_oc) {
- // caching on, use FileCache.
- Context *onimplement = 0;
- if (old_caps & ~new_caps) { // this mds is revoking caps
- if (in->fc.get_caps() & ~(in->file_caps())) // net revocation
- onimplement = new C_Client_ImplementedCaps(this, m, in);
- else {
- implemented_caps(m, in); // ack now.
- }
- }
- in->fc.set_caps(new_caps, onimplement);
- } else {
- // caching off.
+ if (ack)
+ messenger->send_message(m, m->get_source_inst());
- // wake up waiters?
- if (new_caps & CEPH_CAP_RD) {
- for (list<Cond*>::iterator it = in->waitfor_read.begin();
- it != in->waitfor_read.end();
- it++) {
- dout(5) << "signaling read waiter " << *it << dendl;
- (*it)->Signal();
- }
- in->waitfor_read.clear();
- }
- if (new_caps & CEPH_CAP_WR) {
- for (list<Cond*>::iterator it = in->waitfor_write.begin();
- it != in->waitfor_write.end();
- it++) {
- dout(5) << "signaling write waiter " << *it << dendl;
- (*it)->Signal();
- }
- in->waitfor_write.clear();
- }
- if (new_caps & CEPH_CAP_LAZYIO) {
- for (list<Cond*>::iterator it = in->waitfor_lazy.begin();
- it != in->waitfor_lazy.end();
- it++) {
- dout(5) << "signaling lazy waiter " << *it << dendl;
- (*it)->Signal();
- }
- in->waitfor_lazy.clear();
- }
+ if (invalidate)
+ in->fc.release_clean();
+
+ if (writeback)
+ in->fc.flush_dirty();
- // ack?
- if (old_caps & ~new_caps) {
- if (in->sync_writes) {
- // wait for sync writes to finish
- dout(5) << "sync writes in progress, will ack on finish" << dendl;
- in->waitfor_no_write.push_back(new C_Client_ImplementedCaps(this, m, in));
- } else {
- // ok now
- implemented_caps(m, in);
- }
- } else {
- // discard
- delete m;
- }
- }
}
void Client::implemented_caps(MClientFileCaps *m, Inode *in)
}
-void Client::release_caps(Inode *in,
- int retain)
-{
- int wanted = in->file_caps_wanted();
- dout(5) << "releasing caps on ino " << in->inode.ino << dec
- << " had " << cap_string(in->file_caps())
- << " retaining " << cap_string(retain)
- << " want " << cap_string(wanted)
- << dendl;
-
- for (map<int,InodeCap>::iterator it = in->caps.begin();
- it != in->caps.end();
- it++) {
- //if (it->second.caps & ~retain) {
- if (1) {
- // release (some of?) these caps
- it->second.caps = retain & it->second.caps;
- // note: tell mds _full_ wanted; it'll filter/behave based on what it is allowed to do
- MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK,
- in->inode,
- it->second.seq,
- it->second.caps,
- wanted);
- messenger->send_message(m, mdsmap->get_inst(it->first));
- }
- if (wanted == 0)
- mds_sessions[it->first].num_caps--;
- }
- if (wanted == 0 && !in->caps.empty()) {
- in->caps.clear();
- put_inode(in);
- }
-}
-
-void Client::update_caps_wanted(Inode *in)
-{
- int wanted = in->file_caps_wanted();
- dout(5) << "updating caps wanted on ino " << in->inode.ino
- << " to " << cap_string(wanted)
- << " max_size " << in->wanted_max_size
- << dendl;
-
- // FIXME: pick a single mds and let the others off the hook..
- for (map<int,InodeCap>::iterator it = in->caps.begin();
- it != in->caps.end();
- it++) {
- MClientFileCaps *m = new MClientFileCaps(CEPH_CAP_OP_ACK,
- in->inode,
- it->second.seq,
- it->second.caps,
- wanted);
- m->set_max_size(in->wanted_max_size);
- in->requested_max_size = in->wanted_max_size;
- messenger->send_message(m, mdsmap->get_inst(it->first));
- if (wanted == 0)
- mds_sessions[it->first].num_caps--;
- }
- if (wanted == 0 && !in->caps.empty()) {
- in->caps.clear();
- put_inode(in);
- }
-}
-
// -------------------
in->fc.empty(new C_Client_CloseRelease(this, in));
} else {
dout(10) << "unmount residual caps on " << in->ino() << ", releasing" << dendl;
- release_caps(in);
+ check_caps(in);
}
}
}
Dentry *dn = lookup(path);
int want = CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
if (dn && dn->inode &&
- (dn->inode->file_caps() & want) == want) {
+ (dn->inode->caps_issued() & want) == want) {
dout(5) << " have WR and EXCL caps, just updating our m/atime" << dendl;
dn->inode->inode.time_warp_seq++;
dn->inode->inode.mtime = mtime;
Inode *in = 0;
if (dn) {
in = dn->inode;
- in->add_open(cmode); // make note of pending open, since it effects _wanted_ caps.
+ in->get_open_ref(cmode); // make note of pending open, since it effects _wanted_ caps.
}
in = 0;
f->inode = in;
f->inode->get();
if (!dn)
- in->add_open(f->mode); // i may have alrady added it above!
+ in->get_open_ref(f->mode); // i may have alrady added it above!
dout(10) << in->inode.ino << " wr " << in->num_open_wr << " rd " << in->num_open_rd
<< " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << dendl;
if (in->caps.count(mds) == 0)
mds_sessions[mds].num_caps++;
- int new_caps = reply->get_file_caps();
+ int new_caps = reply->get_caps_issued();
- assert(reply->get_file_caps_seq() >= in->caps[mds].seq);
- if (reply->get_file_caps_seq() > in->caps[mds].seq) {
- int old_caps = in->caps[mds].caps;
+ assert(reply->get_file_caps_seq() >= cap.seq);
+ if (reply->get_file_caps_seq() > cap.seq) {
+ int old_caps = cap.caps;
dout(7) << "open got caps " << cap_string(new_caps)
<< " (had " << cap_string(old_caps) << ")"
<< " from mds" << mds
<< dendl;
- in->caps[mds].caps = new_caps;
- in->caps[mds].seq = reply->get_file_caps_seq();
+ cap.caps = new_caps;
+ cap.seq = reply->get_file_caps_seq();
// we shouldn't ever lose caps at this point.
// actually, we might...?
- assert((old_caps & ~in->caps[mds].caps) == 0);
+ assert((old_caps & ~cap.caps) == 0);
if (g_conf.client_oc)
in->fc.set_caps(new_caps);
<< dendl;
}
- dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->file_caps()) << dendl;
+ dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->caps_issued()) << dendl;
}
delete reply;
Inode *in = f->inode;
// update inode rd/wr counts
- int before = in->file_caps_wanted();
- in->sub_open(f->mode);
- int after = in->file_caps_wanted();
+ int before = in->caps_wanted();
+ in->put_open_ref(f->mode);
+ int after = in->caps_wanted();
delete f;
// does this change what caps we want?
if (before != after && after)
- update_caps_wanted(in);
+ check_caps(in);
// release caps right away?
dout(10) << "num_open_rd " << in->num_open_rd << " num_open_wr " << in->num_open_wr << dendl;
if (lazy) {
// wait for lazy cap
- if ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
+ if ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
Cond cond;
in->waitfor_lazy.push_back(&cond);
}
} else {
// wait for RD cap?
- while ((in->file_caps() & CEPH_CAP_RD) == 0) {
+ while ((in->caps_issued() & CEPH_CAP_RD) == 0) {
dout(7) << " don't have read cap, waiting" << dendl;
goto wait;
}
}
// async i/o?
- if ((in->file_caps() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
+ if ((in->caps_issued() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
// FIXME: this logic needs to move info FileCache!
unsafe_sync_write--;
in->uncommitted_writes--;
- if (in->uncommitted_writes == 0) {
- dout(15) << "sync_write_commit all sync writes committed on ino " << in->inode.ino << dendl;
- for (list<Cond*>::iterator p = in->waitfor_commit.begin();
- p != in->waitfor_commit.end();
- p++)
- (*p)->Signal();
- in->waitfor_commit.clear();
- }
+ cl->put_cap_ref(in, CEPH_CAP_WRBUFFER);
dout(15) << "sync_write_commit unsafe_sync_write = " << unsafe_sync_write << dendl;
if (unsafe_sync_write == 0 && unmounting) {
bufferlist blist;
blist.push_back( bp );
- if (g_conf.client_oc) { // buffer cache ON?
- assert(objectcacher);
-
- // write (this may block!)
- in->fc.write(offset, size, blist, client_lock);
-
- } else {
- // legacy, inconsistent synchronous write.
- dout(7) << "synchronous write" << dendl;
-
- // do we have write file cap?
- __u64 endoff = offset + size;
-
- if ((endoff >= in->inode.max_size ||
- endoff > (in->inode.size << 1)) &&
- endoff > in->wanted_max_size) {
- dout(10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
- in->wanted_max_size = endoff;
- update_caps_wanted(in);
- }
-
- while (!lazy &&
- ((in->file_caps() & CEPH_CAP_WR) == 0 ||
- endoff > in->inode.max_size)) {
- dout(7) << " don't have write cap for endoff " << endoff
- << " (max " << in->inode.max_size << "), waiting" << dendl;
- Cond cond;
- in->waitfor_write.push_back(&cond);
- cond.Wait(client_lock);
- }
- while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
- dout(7) << " don't have lazy cap, waiting" << dendl;
- Cond cond;
+ // request larger max_size?
+ __u64 endoff = offset + size;
+ if ((endoff >= in->inode.max_size ||
+ endoff > (in->inode.size << 1)) &&
+ endoff > in->wanted_max_size) {
+ dout(10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
+ in->wanted_max_size = endoff;
+ check_caps(in);
+ }
+
+ // wait for caps, max_size
+ while ((lazy && (in->caps_issued() & CEPH_CAP_LAZYIO) == 0) ||
+ (!lazy && (in->caps_issued() & CEPH_CAP_WR) == 0) ||
+ endoff > in->inode.max_size) {
+ dout(7) << "missing wr|lazy cap OR endoff " << endoff
+ << " > max_size " << in->inode.max_size
+ << ", waiting" << dendl;
+ Cond cond;
+ if (lazy)
in->waitfor_lazy.push_back(&cond);
- cond.Wait(client_lock);
- }
+ else
+ in->waitfor_write.push_back(&cond);
+ cond.Wait(client_lock);
+ }
- // avoid livelock with fsync
- if (in->uncommitted_writes > 0 &&
- !in->waitfor_commit.empty())
- _fsync(f, true);
+ in->get_cap_ref(CEPH_CAP_WR);
- // prepare write
+ // avoid livelock with fsync?
+
+ if (g_conf.client_oc) {
+ // buffer cache
+ in->fc.write(offset, size, blist, client_lock);
+ } else {
+ // simple, non-atomic sync write
Cond cond;
bool done = false;
Context *onfinish = new C_Cond(&cond, &done);
- in->get();
Context *onsafe = new C_Client_SyncCommit(this, in);
+
unsafe_sync_write++;
- in->sync_writes++;
-
- dout(20) << " sync write start " << onfinish << dendl;
+ in->get();
+ in->get_cap_ref(CEPH_CAP_WRBUFFER);
filer->write(in->inode, offset, size, blist, 0,
- onfinish, onsafe
- //, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots
- );
+ onfinish, onsafe);
- while (!done) {
+ while (!done)
cond.Wait(client_lock);
- dout(20) << " sync write bump " << onfinish << dendl;
- }
-
- in->sync_writes--;
- if (in->sync_writes == 0 &&
- !in->waitfor_no_write.empty()) {
- for (list<Context*>::iterator i = in->waitfor_no_write.begin();
- i != in->waitfor_no_write.end();
- i++)
- (*i)->finish(0);
- in->waitfor_no_write.clear();
- }
-
- dout(20) << " sync write done " << onfinish << dendl;
}
// time
// mtime
in->inode.mtime = g_clock.real_now();
+ put_cap_ref(in, CEPH_CAP_WR);
+
// ok!
return totalwritten;
}
if (f->mode & CEPH_FILE_MODE_LAZY) {
// wait for lazy cap
- while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
+ while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
Cond cond;
in->waitfor_lazy.push_back(&cond);
if (f->mode & CEPH_FILE_MODE_LAZY) {
// wait for lazy cap
- while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
+ while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
Cond cond;
in->waitfor_lazy.push_back(&cond);
class InodeCap {
public:
- int caps;
+ unsigned issued;
+ unsigned implemented;
unsigned seq;
- InodeCap() : caps(0), seq(0) {}
+ InodeCap() : issued(0), implemented(0), seq(0) {}
};
map<int,InodeCap> caps; // mds -> InodeCap
map<int,InodeCap> stale_caps; // mds -> cap .. stale
- int num_open_rd, num_open_wr, num_open_lazy; // num readers, writers
+ int open_by_mode[CEPH_FILE_MODE_NUM];
+ map<int,int> cap_refs;
+
__u64 wanted_max_size, requested_max_size;
int ref; // ref count. 1 for each dentry, fh that links to me.
// for caching i/o mode
FileCache fc;
- // for sync i/o mode
- int sync_reads; // sync reads in progress
- int sync_writes; // sync writes in progress
- int uncommitted_writes; // sync writes missing commits
list<Cond*> waitfor_write;
list<Cond*> waitfor_read;
//inode(_inode),
lease_mask(0), lease_mds(-1),
dir_auth(-1), dir_hashed(false), dir_replicated(false),
- num_open_rd(0), num_open_wr(0), num_open_lazy(0),
wanted_max_size(0), requested_max_size(0),
ref(0), ll_ref(0),
dir(0), dn(0), symlink(0),
fc(_oc, ino, layout),
- sync_reads(0), sync_writes(0), uncommitted_writes(0),
hack_balance_reads(false)
{
+ memset(open_by_mode, 0, sizeof(int)*CEPH_FILE_MODE_NUM);
inode.ino = ino;
}
~Inode() {
bool is_dir() { return inode.is_dir(); }
- int file_caps() {
+
+ // CAPS --------
+ void get_open_ref(int mode) {
+ open_by_mode[mode]++;
+ }
+ bool put_open_ref(int mode) {
+ if (--open_by_mode[mode] == 0)
+ return true;
+ return false;
+ }
+
+ void get_cap_ref(int cap);
+ bool put_cap_ref(int cap);
+
+ int caps_issued() {
int c = 0;
for (map<int,InodeCap>::iterator it = caps.begin();
it != caps.end();
it++)
- c |= it->second.caps;
+ c |= it->second.issued;
for (map<int,InodeCap>::iterator it = stale_caps.begin();
it != stale_caps.end();
it++)
- c |= it->second.caps;
+ c |= it->second.issued;
return c;
}
- int file_caps_wanted() {
+ int caps_used() {
int w = 0;
- if (num_open_rd) w |= CEPH_CAP_RD|CEPH_CAP_RDCACHE;
- if (num_open_wr) w |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
- if (num_open_lazy) w |= CEPH_CAP_LAZYIO;
- if (fc.is_dirty()) w |= CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
- if (fc.is_cached()) w |= CEPH_CAP_RDCACHE;
+ for (map<int,int>::iterator p = cap_refs.begin();
+ p != cap_refs.end();
+ p++)
+ w |= p->first;
return w;
}
+ int caps_file_wanted() {
+ int want = 0;
+ for (int mode = 0; mode < 4; mode++)
+ if (open_by_mode[mode])
+ want |= ceph_caps_for_mode(mode);
+ return want;
+ }
+ int caps_wanted() {
+ int want = caps_file_wanted() | caps_used();
+ if (want & CEPH_CAP_WRBUFFER)
+ want |= CEPH_CAP_EXCL;
+ return want;
+ }
int get_effective_lease_mask(utime_t now) {
int havemask = 0;
if (now < lease_ttl && lease_mds >= 0)
havemask |= lease_mask;
- if (file_caps() & CEPH_CAP_EXCL)
+ if (caps_issued() & CEPH_CAP_EXCL)
havemask |= CEPH_LOCK_ICONTENT;
if (havemask & CEPH_LOCK_ICONTENT)
havemask |= CEPH_LOCK_ICONTENT; // hack: if we have one, we have both, for the purposes of below
bool have_valid_size() {
// RD+RDCACHE or WR+WRBUFFER => valid size
- if ((file_caps() & (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) == (CEPH_CAP_RD|CEPH_CAP_RDCACHE))
+ if ((caps_issued() & (CEPH_CAP_RD|CEPH_CAP_RDCACHE)) == (CEPH_CAP_RD|CEPH_CAP_RDCACHE))
return true;
- if ((file_caps() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))
+ if ((caps_issued() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))
return true;
// otherwise, look for lease or EXCL...
if (get_effective_lease_mask(g_clock.now()) & CEPH_LOCK_ICONTENT)
return false;
}
- void add_open(int cmode) {
- if (cmode & CEPH_FILE_MODE_RD) num_open_rd++;
- if (cmode & CEPH_FILE_MODE_WR) num_open_wr++;
- if (cmode & CEPH_FILE_MODE_LAZY) num_open_lazy++;
- }
- void sub_open(int cmode) {
- if (cmode & CEPH_FILE_MODE_RD) num_open_rd--;
- if (cmode & CEPH_FILE_MODE_WR) num_open_wr--;
- if (cmode & CEPH_FILE_MODE_LAZY) num_open_lazy--;
- }
-
+
int authority(const string& dname) {
if (!dirfragtree.empty()) {
__gnu_cxx::hash<string> H;
// file caps
void handle_file_caps(class MClientFileCaps *m);
+ void check_caps(Inode *in);
+ void put_cap_ref(Inode *in, int cap);
+
+
void implemented_caps(class MClientFileCaps *m, Inode *in);
void release_caps(Inode *in, int retain=0);
- void update_caps_wanted(Inode *in);
+
+
void close_release(Inode *in);
void close_safe(Inode *in);