]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: revamp async truncation
authorSage Weil <sage@newdream.net>
Tue, 3 Mar 2009 00:38:11 +0000 (16:38 -0800)
committerSage Weil <sage@newdream.net>
Tue, 3 Mar 2009 00:38:15 +0000 (16:38 -0800)
We can learn about truncations from an mds reply or from a TRUNC
cap message.  Set up the async truncation in the generic
ceph_fill_file_size helper.  Remove old crufty async trunc code
(that assumed TRUNC would come before an ltruncate reply).

src/kernel/caps.c
src/kernel/inode.c
src/kernel/super.h

index 58f7fc86422352cbda23f6cc486432471622c683..adf91cfb013e876084a6693708200ea50b883c83 100644 (file)
@@ -1004,10 +1004,13 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
                }
        }
        have = __ceph_caps_issued(ci, &implemented);
-       /* HACK: force sync writes...
-       have &= ~CEPH_CAP_WRBUFFER;
-       implemented &= ~CEPH_CAP_WRBUFFER;
-       */
+
+       /*
+        * disallow writes while a truncate is pending
+        */
+       if (ci->i_truncate_pending)
+               have &= ~CEPH_CAP_FILE_WR;
+
        if ((have & need) == need) {
                /*
                 * Look at (implemented & ~have & not) so that we keep waiting
@@ -1274,12 +1277,13 @@ start:
        }
 
        /* size/ctime/mtime/atime? */
+       ceph_fill_file_size(inode, issued,
+                           le32_to_cpu(grant->truncate_seq),
+                           le64_to_cpu(grant->truncate_size), size);
        ceph_decode_timespec(&mtime, &grant->mtime);
        ceph_decode_timespec(&atime, &grant->atime);
        ceph_decode_timespec(&ctime, &grant->ctime);
-       ceph_fill_file_bits(inode, issued,
-                           le32_to_cpu(grant->truncate_seq),
-                           le64_to_cpu(grant->truncate_size), size,
+       ceph_fill_file_time(inode, issued,
                            le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
                            &atime);
 
@@ -1474,11 +1478,20 @@ static void handle_cap_trunc(struct inode *inode,
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
        int seq = le32_to_cpu(trunc->seq);
+       u32 truncate_seq = le32_to_cpu(trunc->truncate_seq);
+       u64 truncate_size = le64_to_cpu(trunc->truncate_size);
        u64 size = le64_to_cpu(trunc->size);
+       int implemented = 0;
+       int dirty = __ceph_caps_dirty(ci);
+       int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
        int queue_trunc = 0;
+       
+       issued |= implemented | dirty;
 
-       dout(10, "handle_cap_trunc inode %p mds%d seq %d\n", inode, mds, seq);
-       queue_trunc = __ceph_queue_vmtruncate(inode, size);
+       dout(10, "handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n",
+            inode, mds, seq, truncate_size, truncate_seq);
+       queue_trunc = ceph_fill_file_size(inode, issued,
+                                         truncate_seq, truncate_size, size);
        spin_unlock(&inode->i_lock);
 
        if (queue_trunc)
index 79047c3736b877749cfe4942cdd412efdfe0076b..3c225559b67ed18574001a2a1cbb2f58c9a840a4 100644 (file)
@@ -274,6 +274,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 
        ci->i_truncate_seq = 0;
        ci->i_truncate_size = 0;
+       ci->i_truncate_pending = 0;
 
        ci->i_max_size = 0;
        ci->i_reported_size = 0;
@@ -295,7 +296,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
        INIT_WORK(&ci->i_pg_inv_work, ceph_inode_invalidate_pages);
 
-       ci->i_vmtruncate_to = -1;
        INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
 
        INIT_LIST_HEAD(&ci->i_listener_list);
@@ -323,31 +323,22 @@ void ceph_destroy_inode(struct inode *inode)
 
 
 /*
- * Helper to fill in size, ctime, mtime, and atime.  We have to be
+ * Helpers to fill in size, ctime, mtime, and atime.  We have to be
  * careful because either the client or MDS may have more up to date
  * info, depending on which capabilities are held, and whether
  * time_warp_seq or truncate_seq have increased.  Ordinarily, mtime
  * and size are monotonically increasing, except when utimes() or
  * truncate() increments the corresponding _seq values on the MDS.
  */
-int ceph_fill_file_bits(struct inode *inode, int issued,
-                       u32 truncate_seq, u64 truncate_size, u64 size,
-                       u64 time_warp_seq, struct timespec *ctime,
-                       struct timespec *mtime, struct timespec *atime)
+int ceph_fill_file_size(struct inode *inode, int issued,
+                       u32 truncate_seq, u64 truncate_size, u64 size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int warn = 0;
        int queue_trunc = 0;
 
        if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
            (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
                dout(10, "size %lld -> %llu\n", inode->i_size, size);
-
-               if (issued & (CEPH_CAP_FILE_RDCACHE|CEPH_CAP_FILE_RD|
-                             CEPH_CAP_FILE_WR|CEPH_CAP_FILE_WRBUFFER|
-                             CEPH_CAP_FILE_EXCL))
-                       queue_trunc = __ceph_queue_vmtruncate(inode, size);
-
                inode->i_size = size;
                inode->i_blocks = (size + (1<<9) - 1) >> 9;
                ci->i_reported_size = size;
@@ -355,6 +346,11 @@ int ceph_fill_file_bits(struct inode *inode, int issued,
                        dout(10, "truncate_seq %u -> %u\n",
                             ci->i_truncate_seq, truncate_seq);
                        ci->i_truncate_seq = truncate_seq;
+                       ci->i_truncate_pending++;
+                       if (issued & (CEPH_CAP_FILE_RDCACHE|CEPH_CAP_FILE_RD|
+                                     CEPH_CAP_FILE_WR|CEPH_CAP_FILE_WRBUFFER|
+                                     CEPH_CAP_FILE_EXCL))
+                               queue_trunc = 1;
                }
        }
        if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) >= 0 &&
@@ -363,6 +359,15 @@ int ceph_fill_file_bits(struct inode *inode, int issued,
                     truncate_size);
                ci->i_truncate_size = truncate_size;
        }
+       return queue_trunc;
+}
+
+void ceph_fill_file_time(struct inode *inode, int issued,
+                        u64 time_warp_seq, struct timespec *ctime,
+                        struct timespec *mtime, struct timespec *atime)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int warn = 0;
 
        if (issued & (CEPH_CAP_FILE_EXCL|
                      CEPH_CAP_FILE_WR|
@@ -399,8 +404,6 @@ int ceph_fill_file_bits(struct inode *inode, int issued,
        if (warn) /* time_warp_seq shouldn't go backwards */
                dout(10, "%p mds time_warp_seq %llu < %u\n",
                     inode, time_warp_seq, ci->i_time_warp_seq);
-
-       return queue_trunc;
 }
 
 /*
@@ -475,12 +478,13 @@ static int fill_inode(struct inode *inode,
        ceph_decode_timespec(&atime, &info->atime);
        ceph_decode_timespec(&mtime, &info->mtime);
        ceph_decode_timespec(&ctime, &info->ctime);
-       queue_trunc = ceph_fill_file_bits(inode, issued,
+       queue_trunc = ceph_fill_file_size(inode, issued,
                                          le32_to_cpu(info->truncate_seq),
                                          le64_to_cpu(info->truncate_size),
-                                         le64_to_cpu(info->size),
-                                         le32_to_cpu(info->time_warp_seq),
-                                         &ctime, &mtime, &atime);
+                                         le64_to_cpu(info->size));
+       ceph_fill_file_time(inode, issued,
+                           le32_to_cpu(info->time_warp_seq),
+                           &ctime, &mtime, &atime);
 
        ci->i_max_size = le64_to_cpu(info->max_size);
        ci->i_layout = info->layout;
@@ -1384,37 +1388,6 @@ void ceph_vmtruncate_work(struct work_struct *work)
        iput(inode);
 }
 
-int __ceph_queue_vmtruncate(struct inode *inode, __u64 size)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int queue_trunc = 0;
-
-       /*
-        * vmtruncate lazily; we can't block on i_mutex in the message
-        * handler path, or we deadlock against osd op replies needed
-        * to complete the writes holding i_lock.  vmtruncate will
-        * also block on page locks held by writes...
-        *
-        * if its an expansion, and there is no truncate pending, we
-        * don't need to truncate.
-        */
-       if (ci->i_vmtruncate_to < 0 && size > inode->i_size) {
-               dout(10, "clean fwd truncate, no vmtruncate needed\n");
-       } else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to) {
-               dout(10, "trunc to %lld < %lld already queued\n",
-                    ci->i_vmtruncate_to, size);
-       } else {
-               /* we need to trunc even smaller */
-               dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
-               ci->i_vmtruncate_to = size;
-               queue_trunc = 1;
-       }
-       i_size_write(inode, size);
-       ci->i_reported_size = size;
-
-       return queue_trunc;
-}
-
 /*
  * called with i_mutex held.
  *
@@ -1424,23 +1397,33 @@ int __ceph_queue_vmtruncate(struct inode *inode, __u64 size)
 void __ceph_do_pending_vmtruncate(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       loff_t to;
-       int wrbuffer_refs;
+       u64 to;
+       int wrbuffer_refs, wake = 0;
 
        spin_lock(&inode->i_lock);
-       to = ci->i_vmtruncate_to;
-       ci->i_vmtruncate_to = -1;
+       if (ci->i_truncate_pending == 0) {
+               dout(10, "__do_pending_vmtruncate %p none pending\n", inode);
+               spin_unlock(&inode->i_lock);
+               return;
+       }
+       to = ci->i_truncate_size;
        wrbuffer_refs = ci->i_wrbuffer_ref;
+       dout(10, "__do_pending_vmtruncate %p (%d) to %lld\n", inode,
+            ci->i_truncate_pending, to);
        spin_unlock(&inode->i_lock);
 
-       if (to >= 0) {
-               dout(10, "__do_pending_vmtruncate %p to %lld\n", inode, to);
-               truncate_inode_pages(inode->i_mapping, to);
-               if (wrbuffer_refs == 0)
-                       ceph_check_caps(ci, 0, 0, NULL);
-       } else {
-               dout(10, "__do_pending_vmtruncate %p nothing to do\n", inode);
-       }
+       truncate_inode_pages(inode->i_mapping, to);
+
+       spin_lock(&inode->i_lock);
+       ci->i_truncate_pending--;
+       if (ci->i_truncate_pending == 0)
+               wake = 1;
+       spin_unlock(&inode->i_lock);
+
+       if (wrbuffer_refs == 0)
+               ceph_check_caps(ci, 0, 0, NULL);
+       if (wake)
+               wake_up(&ci->i_cap_wq);
 }
 
 /*
index a6b7cd9e1a83e3ce21cb670eb310651444b00778..89caebd7a1e5f56b8cc44dcb797044a2f582ae93 100644 (file)
@@ -263,8 +263,9 @@ struct ceph_inode_info {
 
        int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
 
-       u32 i_truncate_seq;
-       u64 i_truncate_size;
+       u32 i_truncate_seq;        /* last truncate to smaller size */
+       u64 i_truncate_size;       /*  and the size we last truncated down to */
+       int i_truncate_pending;    /*  still need to call vmtruncate */
 
        u64 i_max_size;            /* max file size authorized by mds */
        u64 i_reported_size; /* (max_)size reported to or requested of mds */
@@ -288,7 +289,6 @@ struct ceph_inode_info {
        struct work_struct i_wb_work;  /* writeback work */
        struct work_struct i_pg_inv_work;  /* page invalidation work */
 
-       loff_t i_vmtruncate_to;        /* delayed truncate work */
        struct work_struct i_vmtruncate_work;
 
        struct list_head i_listener_list; /* requests we pend on */
@@ -681,10 +681,11 @@ extern void ceph_destroy_inode(struct inode *inode);
 extern struct inode *ceph_get_inode(struct super_block *sb,
                                    struct ceph_vino vino);
 extern struct inode *ceph_get_snapdir(struct inode *parent);
-extern int ceph_fill_file_bits(struct inode *inode, int issued,
-                              u32 truncate_seq, u64 truncate_size, u64 size,
-                              u64 time_warp_seq, struct timespec *ctime,
-                              struct timespec *mtime, struct timespec *atime);
+extern int ceph_fill_file_size(struct inode *inode, int issued,
+                              u32 truncate_seq, u64 truncate_size, u64 size);
+extern void ceph_fill_file_time(struct inode *inode, int issued,
+                               u64 time_warp_seq, struct timespec *ctime,
+                               struct timespec *mtime, struct timespec *atime);
 extern int ceph_fill_trace(struct super_block *sb,
                           struct ceph_mds_request *req,
                           struct ceph_mds_session *session);
@@ -697,7 +698,6 @@ extern int ceph_dentry_lease_valid(struct dentry *dentry);
 extern void ceph_inode_set_size(struct inode *inode, loff_t size);
 extern void ceph_inode_writeback(struct work_struct *work);
 extern void ceph_vmtruncate_work(struct work_struct *work);
-extern int __ceph_queue_vmtruncate(struct inode *inode, __u64 size);
 extern void __ceph_do_pending_vmtruncate(struct inode *inode);
 
 extern int ceph_do_getattr(struct dentry *dentry, int mask);