int ceph_open(struct inode *inode, struct file *file)
{
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
struct dentry *dentry = list_entry(inode->i_dentry.next, struct dentry,
/* filter out O_CREAT|O_EXCL; vfs did that already. yuck. */
int flags = file->f_flags & ~(O_CREAT|O_EXCL);
+ int mode = ceph_file_mode(flags);
dout(5, "open inode %p ino %llx file %p\n", inode,
ceph_ino(inode), file);
return 0;
}
+ /* can we re-use existing caps? */
+ spin_lock(&inode->i_lock);
+ if ((__ceph_caps_issued(ci) & mode) == mode) {
+ dout(10, "open mode %d using existing caps on %p\n",
+ mode, inode);
+ spin_unlock(&inode->i_lock);
+ err = ceph_init_file(inode, file, flags);
+ BUG_ON(err); /* fixme */
+ return 0;
+ }
+ spin_unlock(&inode->i_lock);
+ dout(10, "open mode %d, don't have caps\n", mode);
+
req = prepare_open_request(inode->i_sb, dentry, flags, 0);
if (IS_ERR(req))
return PTR_ERR(req);
dout(5, "ceph_lookup_open dentry %p '%.*s' flags %d mode 0%o\n",
dentry, dentry->d_name.len, dentry->d_name.name, flags, mode);
- /* can we re-use an existing CAP_PIN? */
- if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
- struct ceph_inode_info *ci = ceph_inode(dentry->d_inode);
- spin_lock(&ci->vfs_inode.i_lock);
- if (__ceph_caps_issued(ci) & CEPH_CAP_PIN) {
- dout(10, "using existing CAP_PIN on %p\n",
- dentry->d_inode);
- ci->i_nr_by_mode[FILE_MODE_PIN]++;
- spin_unlock(&ci->vfs_inode.i_lock);
- err = ceph_init_file(&ci->vfs_inode, file, flags);
- BUG_ON(err); /* fixme */
- return 0;
- }
- spin_unlock(&ci->vfs_inode.i_lock);
- }
-
/* do the open */
req = prepare_open_request(dir->i_sb, dentry, flags, mode);
if (IS_ERR(req))
}
spin_unlock(&inode->i_lock);
if (check)
- ceph_check_caps(ci);
+ ceph_check_caps(ci, 1);
dout(10, "write trying to get caps. i_size %llu\n", inode->i_size);
ret = wait_event_interruptible(ci->i_cap_wq,
iput(inode);
}
+void ceph_cap_delayed_work(struct work_struct *work)
+{
+ struct ceph_inode_info *ci = container_of(work,
+ struct ceph_inode_info,
+ i_cap_dwork.work);
+ if (ci->i_hold_caps_until > jiffies) {
+ dout(10, "cap_dwork on %p -- rescheduling\n", &ci->vfs_inode);
+ schedule_delayed_work(&ci->i_cap_dwork,
+ ci->i_hold_caps_until - jiffies);
+ } else {
+ dout(10, "cap_dwork on %p\n", &ci->vfs_inode);
+ ceph_check_caps(ci, 0);
+ }
+}
+
/*
* examine currently used, wanted versus held caps.
* release, ack revoked caps to mds as appropriate.
+ * @was_last if caller just dropped a cap ref, and we probably want to delay
*/
-void ceph_check_caps(struct ceph_inode_info *ci)
+void ceph_check_caps(struct ceph_inode_info *ci, int was_last)
{
struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = &client->mdsc;
spin_lock(&ci->vfs_inode.i_lock);
wanted = __ceph_caps_wanted(ci);
used = __ceph_caps_used(ci);
+ dout(10, "check_caps %p wanted %d used %d issued %d\n", &ci->vfs_inode,
+ wanted, used, __ceph_caps_issued(ci));
+
+ if (was_last) {
+ unsigned long until = round_jiffies(jiffies + HZ * 5);
+ if (until > ci->i_hold_caps_until) {
+ ci->i_hold_caps_until = until;
+ dout(10, "hold_caps_until %lu\n", until);
+ schedule_delayed_work(&ci->i_cap_dwork,
+ until - jiffies);
+ }
+ }
list_for_each(p, &ci->i_caps) {
int revoking, dropping;
if ((cap->issued & ~wanted) == 0)
continue; /* nothing extra, all good */
+
+ if (jiffies < ci->i_hold_caps_until) {
+ /* delaying cap release for a bit */
+ dout(30, "delaying cap release\n");
+ continue;
+ }
ack:
/* take s_mutex, one way or another */
if ((size << 1) >= ci->i_max_size &&
(ci->i_reported_size << 1) < ci->i_max_size) {
spin_unlock(&inode->i_lock);
- ceph_check_caps(ci);
+ ceph_check_caps(ci, 0);
} else
spin_unlock(&inode->i_lock);
}
int last = 0;
spin_lock(&ci->vfs_inode.i_lock);
+ dout(20, "put_mode %p mode %d %d -> %d\n", &ci->vfs_inode, mode,
+ ci->i_nr_by_mode[mode], ci->i_nr_by_mode[mode]-1);
if (--ci->i_nr_by_mode[mode] == 0)
last++;
spin_unlock(&ci->vfs_inode.i_lock);
if (last)
- ceph_check_caps(ci);
+ ceph_check_caps(ci, 1);
}
last ? "last":"");
if (last)
- ceph_check_caps(ci);
+ ceph_check_caps(ci, 1);
}
void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
last ? "last":"");
if (last)
- ceph_check_caps(ci);
+ ceph_check_caps(ci, 1);
}
/* send and wait */
spin_unlock(&mdsc->lock);
+ dout(10, "do_request %p r_expects_cap=%d\n", req, req->r_expects_cap);
send_msg_mds(mdsc, req->r_request, mds);
wait_for_completion(&req->r_completion);
spin_lock(&mdsc->lock);
dout(1, "handle_reply on unknown tid %llu\n", tid);
return;
}
+ dout(10, "handle_reply %p r_expects_cap=%d\n", req, req->r_expects_cap);
mds = le32_to_cpu(msg->hdr.src.name.num);
req->r_session = __get_session(mdsc, mds);
BUG_ON(req->r_session == 0);
ci->i_rd_ref = ci->i_rdcache_ref = 0;
ci->i_wr_ref = ci->i_wrbuffer_ref = 0;
+ ci->i_hold_caps_until = 0;
ci->i_hashval = 0;
INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
+ INIT_DELAYED_WORK(&ci->i_cap_dwork, ceph_cap_delayed_work);
return &ci->vfs_inode;
}
struct list_head i_caps;
struct ceph_inode_cap i_static_caps[STATIC_CAPS];
wait_queue_head_t i_cap_wq;
+ unsigned long i_hold_caps_until; /* jiffies */
int i_nr_by_mode[4];
loff_t i_max_size; /* size authorized by mds */
unsigned long i_hashval;
struct work_struct i_wb_work; /* writeback work */
- struct work_struct i_cap_work; /* cap work */
+ struct delayed_work i_cap_dwork; /* cap work */
struct inode vfs_inode; /* at end */
};
return used;
}
+static inline int ceph_caps_for_mode(int mode)
+{
+ switch (mode) {
+ case FILE_MODE_PIN:
+ return CEPH_CAP_PIN;
+ case FILE_MODE_RDONLY:
+ return CEPH_CAP_PIN |
+ CEPH_CAP_RD | CEPH_CAP_RDCACHE;
+ case FILE_MODE_RDWR:
+ return CEPH_CAP_PIN |
+ CEPH_CAP_RD | CEPH_CAP_RDCACHE |
+ CEPH_CAP_WR | CEPH_CAP_WRBUFFER |
+ CEPH_CAP_EXCL;
+ case FILE_MODE_WRONLY:
+ return CEPH_CAP_PIN |
+ CEPH_CAP_WR | CEPH_CAP_WRBUFFER |
+ CEPH_CAP_EXCL;
+ }
+ return 0;
+}
+
static inline int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
{
int want = 0;
- if (ci->i_nr_by_mode[0])
- want |= CEPH_CAP_PIN;
- if (ci->i_nr_by_mode[1])
- want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE;
- if (ci->i_nr_by_mode[2])
- want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE|
- CEPH_CAP_WR|CEPH_CAP_WRBUFFER|
- CEPH_CAP_EXCL;
- if (ci->i_nr_by_mode[3])
- want |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
+ int mode;
+ for (mode = 0; mode < 4; mode++)
+ if (ci->i_nr_by_mode[mode])
+ want |= ceph_caps_for_mode(mode);
return want;
}
extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
-extern void ceph_check_caps(struct ceph_inode_info *ci);
+extern void ceph_cap_delayed_work(struct work_struct *work);
+extern void ceph_check_caps(struct ceph_inode_info *ci, int was_last);
extern void ceph_get_mode(struct ceph_inode_info *ci, int mode);
extern void ceph_put_mode(struct ceph_inode_info *ci, int mode);
extern void ceph_inode_set_size(struct inode *inode, loff_t size);