]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: delay cap release, and re-use on open
authorSage Weil <sage@newdream.net>
Tue, 15 Apr 2008 19:47:50 +0000 (12:47 -0700)
committerSage Weil <sage@newdream.net>
Tue, 15 Apr 2008 19:47:50 +0000 (12:47 -0700)
src/kernel/file.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/super.c
src/kernel/super.h

index 3169720601716f768d2fe4cb1c0284b7f6d1268a..71a70b0c99c997a2118900e56b28e611eb3b6b19 100644 (file)
@@ -65,6 +65,7 @@ static int ceph_init_file(struct inode *inode, struct file *file,
 
 int ceph_open(struct inode *inode, struct file *file)
 {
+       struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
        struct ceph_mds_client *mdsc = &client->mdsc;
        struct dentry *dentry = list_entry(inode->i_dentry.next, struct dentry,
@@ -75,6 +76,7 @@ int ceph_open(struct inode *inode, struct file *file)
 
        /* filter out O_CREAT|O_EXCL; vfs did that already.  yuck. */
        int flags = file->f_flags & ~(O_CREAT|O_EXCL);
+       int mode = ceph_file_mode(flags);
 
        dout(5, "open inode %p ino %llx file %p\n", inode,
             ceph_ino(inode), file);
@@ -83,6 +85,19 @@ int ceph_open(struct inode *inode, struct file *file)
                return 0;
        }
 
+       /* can we re-use existing caps? */
+       spin_lock(&inode->i_lock);
+       if ((__ceph_caps_issued(ci) & mode) == mode) {
+               dout(10, "open mode %d using existing caps on %p\n", 
+                    mode, inode);
+               spin_unlock(&inode->i_lock);
+               err = ceph_init_file(inode, file, flags);
+               BUG_ON(err); /* fixme */
+               return 0;
+       } 
+       spin_unlock(&inode->i_lock);
+       dout(10, "open mode %d, don't have caps\n", mode);
+
        req = prepare_open_request(inode->i_sb, dentry, flags, 0);
        if (IS_ERR(req))
                return PTR_ERR(req);
@@ -117,22 +132,6 @@ int ceph_lookup_open(struct inode *dir, struct dentry *dentry,
        dout(5, "ceph_lookup_open dentry %p '%.*s' flags %d mode 0%o\n", 
             dentry, dentry->d_name.len, dentry->d_name.name, flags, mode);
        
-       /* can we re-use an existing CAP_PIN? */
-       if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) {
-               struct ceph_inode_info *ci = ceph_inode(dentry->d_inode);
-               spin_lock(&ci->vfs_inode.i_lock);
-               if (__ceph_caps_issued(ci) & CEPH_CAP_PIN) {
-                       dout(10, "using existing CAP_PIN on %p\n",
-                            dentry->d_inode);
-                       ci->i_nr_by_mode[FILE_MODE_PIN]++;
-                       spin_unlock(&ci->vfs_inode.i_lock);
-                       err = ceph_init_file(&ci->vfs_inode, file, flags);
-                       BUG_ON(err); /* fixme */
-                       return 0;
-               }
-               spin_unlock(&ci->vfs_inode.i_lock);
-       }
-
        /* do the open */
        req = prepare_open_request(dir->i_sb, dentry, flags, mode);
        if (IS_ERR(req))
@@ -296,7 +295,7 @@ ssize_t ceph_write(struct file *filp, const char __user *buf,
        }
        spin_unlock(&inode->i_lock);
        if (check)
-               ceph_check_caps(ci);
+               ceph_check_caps(ci, 1);
 
        dout(10, "write trying to get caps. i_size %llu\n", inode->i_size);
        ret = wait_event_interruptible(ci->i_cap_wq,
index 3bb3f6c426ca6495cd5b1246b03a4e0adeb14f47..1e09f9f4d0b34cf0c3c49fe70691731afb333c01 100644 (file)
@@ -756,11 +756,27 @@ void ceph_remove_cap(struct ceph_inode_cap *cap)
        iput(inode);
 }
 
+void ceph_cap_delayed_work(struct work_struct *work)
+{
+       struct ceph_inode_info *ci = container_of(work, 
+                                                 struct ceph_inode_info, 
+                                                 i_cap_dwork.work);
+       if (ci->i_hold_caps_until > jiffies) {
+               dout(10, "cap_dwork on %p -- rescheduling\n", &ci->vfs_inode);
+               schedule_delayed_work(&ci->i_cap_dwork, 
+                                     ci->i_hold_caps_until - jiffies);
+       } else {
+               dout(10, "cap_dwork on %p\n", &ci->vfs_inode);
+               ceph_check_caps(ci, 0);
+       }
+}
+
 /*
  * examine currently used, wanted versus held caps.
  *  release, ack revoked caps to mds as appropriate.
+ * @was_last if caller just dropped a cap ref, and we probably want to delay
  */
-void ceph_check_caps(struct ceph_inode_info *ci)
+void ceph_check_caps(struct ceph_inode_info *ci, int was_last)
 {
        struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -778,6 +794,18 @@ retry:
        spin_lock(&ci->vfs_inode.i_lock);
        wanted = __ceph_caps_wanted(ci);
        used = __ceph_caps_used(ci);
+       dout(10, "check_caps %p wanted %d used %d issued %d\n", &ci->vfs_inode, 
+            wanted, used, __ceph_caps_issued(ci));
+
+       if (was_last) {
+               unsigned long until = round_jiffies(jiffies + HZ * 5);
+               if (until > ci->i_hold_caps_until) {
+                       ci->i_hold_caps_until = until;
+                       dout(10, "hold_caps_until %lu\n", until);
+                       schedule_delayed_work(&ci->i_cap_dwork, 
+                                             until - jiffies);
+               }
+       }
 
        list_for_each(p, &ci->i_caps) {
                int revoking, dropping;
@@ -807,6 +835,12 @@ retry:
                
                if ((cap->issued & ~wanted) == 0)
                        continue;     /* nothing extra, all good */
+               
+               if (jiffies < ci->i_hold_caps_until) {
+                       /* delaying cap release for a bit */
+                       dout(30, "delaying cap release\n");
+                       continue;
+               }
 
        ack:
                /* take s_mutex, one way or another */
@@ -879,7 +913,7 @@ void ceph_inode_set_size(struct inode *inode, loff_t size)
        if ((size << 1) >= ci->i_max_size &&
            (ci->i_reported_size << 1) < ci->i_max_size) {
                spin_unlock(&inode->i_lock);
-               ceph_check_caps(ci);
+               ceph_check_caps(ci, 0);
        } else
                spin_unlock(&inode->i_lock);
 }
@@ -900,12 +934,14 @@ void ceph_put_mode(struct ceph_inode_info *ci, int mode)
        int last = 0;
 
        spin_lock(&ci->vfs_inode.i_lock);
+       dout(20, "put_mode %p mode %d %d -> %d\n", &ci->vfs_inode, mode,
+            ci->i_nr_by_mode[mode], ci->i_nr_by_mode[mode]-1);
        if (--ci->i_nr_by_mode[mode] == 0)
                last++;
        spin_unlock(&ci->vfs_inode.i_lock);
 
        if (last)
-               ceph_check_caps(ci);
+               ceph_check_caps(ci, 1);
 }
 
 
@@ -1163,7 +1199,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
             last ? "last":"");
        
        if (last) 
-               ceph_check_caps(ci);
+               ceph_check_caps(ci, 1);
 }
 
 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
@@ -1181,7 +1217,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
             last ? "last":"");
        
        if (last) 
-               ceph_check_caps(ci);
+               ceph_check_caps(ci, 1);
 }
 
 
index 7fdfc4a5e37832ad25a4ef7fcada25ac8c6e71ea..b8da21d5aad7a67120a6e01fbe52dc81639302fd 100644 (file)
@@ -848,6 +848,7 @@ retry:
 
        /* send and wait */
        spin_unlock(&mdsc->lock);
+       dout(10, "do_request %p r_expects_cap=%d\n", req, req->r_expects_cap);
        send_msg_mds(mdsc, req->r_request, mds);
        wait_for_completion(&req->r_completion);
        spin_lock(&mdsc->lock);
@@ -898,6 +899,7 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
                dout(1, "handle_reply on unknown tid %llu\n", tid);
                return;
        }
+       dout(10, "handle_reply %p r_expects_cap=%d\n", req, req->r_expects_cap);
        mds = le32_to_cpu(msg->hdr.src.name.num);
        req->r_session = __get_session(mdsc, mds);
        BUG_ON(req->r_session == 0);
index cc8fd239c91b6ab1e59a5f5e7d8214e03fb77bc9..84adfbe61236be27afdf60f9adc47fab97563820 100644 (file)
@@ -150,10 +150,12 @@ static struct inode *ceph_alloc_inode(struct super_block *sb)
 
        ci->i_rd_ref = ci->i_rdcache_ref = 0;
        ci->i_wr_ref = ci->i_wrbuffer_ref = 0;
+       ci->i_hold_caps_until = 0;
 
        ci->i_hashval = 0;
 
        INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
+       INIT_DELAYED_WORK(&ci->i_cap_dwork, ceph_cap_delayed_work);
 
        return &ci->vfs_inode;
 }
index 0e44ee5a704742051f6d2eb8dec791edbc08b523..285efba53b4e6ff8d92bc33e4cafcc6ad3304bdd 100644 (file)
@@ -162,6 +162,7 @@ struct ceph_inode_info {
        struct list_head i_caps;
        struct ceph_inode_cap i_static_caps[STATIC_CAPS];
        wait_queue_head_t i_cap_wq;
+       unsigned long i_hold_caps_until; /* jiffies */
 
        int i_nr_by_mode[4];
        loff_t i_max_size;      /* size authorized by mds */
@@ -176,7 +177,7 @@ struct ceph_inode_info {
        unsigned long i_hashval;
 
        struct work_struct i_wb_work;  /* writeback work */
-       struct work_struct i_cap_work;  /* cap work */
+       struct delayed_work i_cap_dwork;  /* cap work */
 
        struct inode vfs_inode; /* at end */
 };
@@ -273,19 +274,34 @@ static inline int __ceph_caps_used(struct ceph_inode_info *ci)
        return used;
 }
 
+static inline int ceph_caps_for_mode(int mode)
+{
+       switch (mode) {
+       case FILE_MODE_PIN: 
+               return CEPH_CAP_PIN;
+       case FILE_MODE_RDONLY: 
+               return CEPH_CAP_PIN | 
+                       CEPH_CAP_RD | CEPH_CAP_RDCACHE;
+       case FILE_MODE_RDWR:
+               return CEPH_CAP_PIN | 
+                       CEPH_CAP_RD | CEPH_CAP_RDCACHE |
+                       CEPH_CAP_WR | CEPH_CAP_WRBUFFER |
+                       CEPH_CAP_EXCL;
+       case FILE_MODE_WRONLY:
+               return CEPH_CAP_PIN | 
+                       CEPH_CAP_WR | CEPH_CAP_WRBUFFER |
+                       CEPH_CAP_EXCL;
+       }
+       return 0;
+}
+
 static inline int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
 {
        int want = 0;
-       if (ci->i_nr_by_mode[0])
-               want |= CEPH_CAP_PIN;
-       if (ci->i_nr_by_mode[1])
-               want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE;
-       if (ci->i_nr_by_mode[2])
-               want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE|
-                       CEPH_CAP_WR|CEPH_CAP_WRBUFFER|
-                       CEPH_CAP_EXCL;
-       if (ci->i_nr_by_mode[3])
-               want |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL;
+       int mode;
+       for (mode = 0; mode < 4; mode++)
+               if (ci->i_nr_by_mode[mode])
+                       want |= ceph_caps_for_mode(mode);
        return want;
 }
 
@@ -396,7 +412,8 @@ extern int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int
 extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
-extern void ceph_check_caps(struct ceph_inode_info *ci);
+extern void ceph_cap_delayed_work(struct work_struct *work);
+extern void ceph_check_caps(struct ceph_inode_info *ci, int was_last);
 extern void ceph_get_mode(struct ceph_inode_info *ci, int mode);
 extern void ceph_put_mode(struct ceph_inode_info *ci, int mode);
 extern void ceph_inode_set_size(struct inode *inode, loff_t size);