]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use vmtruncate; do cap trunc via wq to avoid i_mutex deadlock
authorSage Weil <sage@newdream.net>
Fri, 2 May 2008 16:59:15 +0000 (09:59 -0700)
committerSage Weil <sage@newdream.net>
Fri, 2 May 2008 16:59:15 +0000 (09:59 -0700)
src/kernel/inode.c
src/kernel/super.c
src/kernel/super.h
src/vstart.sh

index 1e9171ad53790a6304bcaf0c9d0ce8fb88cce060..f9e9fa1575cd3941b03e647b58c542bd58d4b3d7 100644 (file)
@@ -1061,57 +1061,61 @@ void ceph_inode_writeback(struct work_struct *work)
        write_inode_now(&ci->vfs_inode, 0);
 }
 
-static int __apply_truncate(struct inode *inode, loff_t size, int check_limit)
+/*
+ * called by setattr
+ */
+static int apply_truncate(struct inode *inode, loff_t size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct address_space *mapping = inode->i_mapping;
-       unsigned long limit;
+       int rc;
+       
+       rc = vmtruncate(inode, size);
+       if (rc == 0) {
+               spin_lock(&inode->i_lock);
+               ci->i_reported_size = size;
+               spin_unlock(&inode->i_lock);
+       }
+       return rc;
+}
 
-       spin_lock(&inode->i_lock);
-       dout(10, "apply_truncate %p size %lld -> %llu\n", inode,
-            inode->i_size, size);
+/*
+ * called by trunc_wq; take i_mutex ourselves
+ */
+void ceph_vmtruncate_work(struct work_struct *work)
+{
+       struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
+                                                 i_vmtruncate_work);
+       struct inode *inode = &ci->vfs_inode;
 
-       if (inode->i_size < size)
-               goto do_expand;
-       i_size_write(inode, size);
-       ci->i_reported_size = size;
-       spin_unlock(&inode->i_lock);
+       dout(10, "vmtruncate_work %p\n", inode);
+       mutex_lock(&inode->i_mutex);
+       if (inode->i_size < ci->i_vmtruncate_from)
+               vmtruncate(inode, inode->i_size);
+       mutex_unlock(&inode->i_mutex);
+}
 
-       /* from fs/cifs */
-       unmap_mapping_range(mapping, size + PAGE_SIZE - 1, 0, 1);
-       truncate_inode_pages(mapping, size);
-       unmap_mapping_range(mapping, size + PAGE_SIZE - 1, 0, 1);
+static void apply_cap_truncate(struct inode *inode, loff_t size)
+{
+       struct ceph_client *client = ceph_client(inode->i_sb);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int queue = 0;
 
-       return 0;
-do_expand:
-       if (check_limit) {
-               limit = current->signal->rlim[RLIMIT_FSIZE].rlim_cur;
-               if (limit != RLIM_INFINITY && size > limit) {
-                       spin_unlock(&inode->i_lock);
-                       goto out_sig;
-               }
-               if (size > inode->i_sb->s_maxbytes) {
-                       spin_unlock(&inode->i_lock);
-                       return -EFBIG;
-               }
+       /*
+        * vmtruncate lazily; we can't block on i_mutex in the message
+        * handler path, or we deadlock against osd op replies needed
+        * to complete the writes holding the lock...
+        */
+       spin_lock(&inode->i_lock);
+       if (size < inode->i_size) {
+               ci->i_vmtruncate_from = inode->i_size;
+               queue = 1;
        }
        i_size_write(inode, size);
+       ci->i_reported_size = size;
        spin_unlock(&inode->i_lock);
 
-       return 0;
-out_sig:
-       send_sig(SIGXFSZ, current, 0);
-       return -EFBIG;
-}
-
-static int apply_truncate(struct inode *inode, loff_t size)
-{
-       return __apply_truncate(inode, size, 1);
-}
-
-static int apply_cap_truncate(struct inode *inode, loff_t size)
-{
-       return __apply_truncate(inode, size, 0);
+       if (queue)
+               queue_work(client->trunc_wq, &ci->i_vmtruncate_work);
 }
 
 int ceph_handle_cap_trunc(struct inode *inode, struct ceph_mds_file_caps *trunc,
@@ -1120,14 +1124,10 @@ int ceph_handle_cap_trunc(struct inode *inode, struct ceph_mds_file_caps *trunc,
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
        int seq = le32_to_cpu(trunc->seq);
-       int err;
        u64 size = le64_to_cpu(trunc->size);
        dout(10, "handle_cap_trunc inode %p ci %p mds%d seq %d\n", inode, ci,
             mds, seq);
-       err = apply_cap_truncate(inode, size);
-       if (err)
-               return err;
-
+       apply_cap_truncate(inode, size);
        return 0;
 }
 
index 5e47c52e58568e09713e783506275f1fb37a5f4d..b57d47ed54bf24c3244a83156bb5de145720048d 100644 (file)
@@ -177,6 +177,7 @@ static struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_hashval = 0;
 
        INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
+       INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
 
        return &ci->vfs_inode;
 }
@@ -552,6 +553,9 @@ struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
        cl->wb_wq = create_workqueue("ceph-writeback");
        if (cl->wb_wq == 0)
                goto fail;
+       cl->trunc_wq = create_workqueue("ceph-trunc");
+       if (cl->trunc_wq == 0)
+               goto fail;
 
        /* messenger */
        if (args->flags & CEPH_MOUNT_MYIP)
@@ -598,6 +602,8 @@ void ceph_destroy_client(struct ceph_client *cl)
 #endif
        if (cl->wb_wq)
                destroy_workqueue(cl->wb_wq);
+       if (cl->trunc_wq)
+               destroy_workqueue(cl->trunc_wq);
        ceph_messenger_destroy(cl->msgr);
        put_client_counter();
        kfree(cl);
index 0e77e0163e4aa7a554e3ce74532d797435fec583..05afc0d35956704fe2d2b382a86f42f3d253ce4b 100644 (file)
@@ -120,6 +120,7 @@ struct ceph_client {
 
        /* writeback */
        struct workqueue_struct *wb_wq;
+       struct workqueue_struct *trunc_wq;
 
        struct kobject *client_kobj;
 
@@ -198,6 +199,9 @@ struct ceph_inode_info {
 
        struct work_struct i_wb_work;  /* writeback work */
 
+       loff_t i_vmtruncate_from;
+       struct work_struct i_vmtruncate_work;
+
        struct inode vfs_inode; /* at end */
 };
 
@@ -389,6 +393,7 @@ extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
 extern void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed);
 extern void ceph_inode_set_size(struct inode *inode, loff_t size);
 extern void ceph_inode_writeback(struct work_struct *work);
+extern void ceph_vmtruncate_work(struct work_struct *work);
 
 extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
 extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
index ec9bd681c774fd9490dafd4ffe4b944984a4a653..3d19bdf22dbe36f4eae72073cc124518279cdbd2 100755 (executable)
@@ -39,7 +39,7 @@ $CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap
 for osd in 0 #1 2 3 
 do
  $CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd  # initialize empty object store
- $CEPH_BIN/cosd $ARGS dev/osd$osd --debug_ms 1  #--debug_osd 40
+ $CEPH_BIN/cosd $ARGS dev/osd$osd --debug_ms 1 --debug_osd 10 --debug_fakestore 10 #--debug_osd 40
 done
 
 # mds