]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: include truncate osd_ops when needed
authorSage Weil <sage@newdream.net>
Thu, 29 Jan 2009 19:22:15 +0000 (11:22 -0800)
committerSage Weil <sage@newdream.net>
Fri, 30 Jan 2009 19:44:33 +0000 (11:44 -0800)
src/include/ceph_fs.h
src/kernel/addr.c
src/kernel/caps.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/osd_client.c
src/kernel/osd_client.h
src/kernel/super.h

index c5942e9440a3c69eea295cc2d274b9fb47afafea..381144f2a3ddca119485046290a93ec06468d7d7 100644 (file)
@@ -842,7 +842,8 @@ struct ceph_mds_reply_inode {
        struct ceph_file_layout layout;
        struct ceph_timespec ctime, mtime, atime;
        __le32 time_warp_seq;
-       __le64 size, max_size, truncate_seq, truncate_size;
+       __le64 size, max_size, truncate_size;
+       __le32 truncate_seq;
        __le32 mode, uid, gid;
        __le32 nlink;
        __le64 files, subdirs, rbytes, rfiles, rsubdirs;  /* dir stats */
index 5eb529b72e9314b938865d85f212e4db9a023fbc..ad2e46d8363e872fb21d31e95698a1e6111ae2c2 100644 (file)
@@ -230,7 +230,9 @@ static int readpage_nounlock(struct file *filp, struct page *page)
        dout(10, "readpage inode %p file %p page %p index %lu\n",
             inode, filp, page, page->index);
        err = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout,
-                                page->index << PAGE_SHIFT, PAGE_SIZE, page);
+                                page->index << PAGE_SHIFT, PAGE_SIZE,
+                                ci->i_truncate_seq, ci->i_truncate_size,
+                                page);
        if (unlikely(err < 0)) {
                SetPageError(page);
                goto out;
@@ -278,6 +280,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
        offset = page->index << PAGE_CACHE_SHIFT;
        rc = ceph_osdc_readpages(osdc, mapping, ceph_vino(inode), &ci->i_layout,
                                 offset, nr_pages << PAGE_CACHE_SHIFT,
+                                ci->i_truncate_seq, ci->i_truncate_size,
                                 page_list, nr_pages);
        if (rc < 0)
                return rc;
@@ -410,7 +413,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        set_page_writeback(page);
        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
                                   &ci->i_layout, snapc,
-                                  page_off, len, &page, 1);
+                                  page_off, len,
+                                  ci->i_truncate_seq, ci->i_truncate_size,
+                                  &page, 1);
        if (err < 0) {
                dout(20, "writepage setting page error %p\n", page);
                SetPageError(page);
@@ -700,12 +705,13 @@ get_more_pages:
                                offset = page->index << PAGE_CACHE_SHIFT;
                                len = wsize;
                                req = ceph_osdc_new_request(&client->osdc,
-                                                           &ci->i_layout,
-                                                           ceph_vino(inode),
-                                                           offset, &len,
-                                                           CEPH_OSD_OP_WRITE,
-                                                           snapc,
-                                                           do_sync);
+                                                   &ci->i_layout,
+                                                   ceph_vino(inode),
+                                                   offset, &len,
+                                                   CEPH_OSD_OP_WRITE,
+                                                   snapc, do_sync,
+                                                   ci->i_truncate_seq,
+                                                   ci->i_truncate_size);
                                max_pages = req->r_num_pages;
                                pages = req->r_pages;
                                req->r_callback = writepages_finish;
index e277b091aac4b010b1a57c5815d2748ede29207f..dc5dbae09ee87a9c78c18b55761a334458987764 100644 (file)
@@ -1277,7 +1277,8 @@ start:
        ceph_decode_timespec(&atime, &grant->atime);
        ceph_decode_timespec(&ctime, &grant->ctime);
        ceph_fill_file_bits(inode, issued,
-                           le32_to_cpu(grant->truncate_seq), size,
+                           le32_to_cpu(grant->truncate_seq),
+                           le64_to_cpu(grant->truncate_size), size,
                            le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
                            &atime);
 
index 857b3d4f69e15dc9cca4460cfde4fba91fc80ff8..dafe2a49140ce5aa171a2bc5beda2a265423083c 100644 (file)
@@ -240,7 +240,8 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
 
        ret = ceph_osdc_sync_read(&client->osdc, ceph_vino(inode),
                                  &ci->i_layout,
-                                 pos, count, data);
+                                 pos, count, ci->i_truncate_seq,
+                                 ci->i_truncate_size, data);
        if (ret > 0)
                *offset = pos + ret;
        return ret;
@@ -267,7 +268,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
        ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode),
                                   &ci->i_layout,
                                   ci->i_snap_realm->cached_context,
-                                  pos, count, data);
+                                  pos, count, ci->i_truncate_seq,
+                                  ci->i_truncate_size, data);
        if (ret > 0) {
                pos += ret;
                *offset = pos;
index 95cbce0f240bbca9cfeea966ec891a225ede0dbc..7dbc270da4eb23d7a376a7e0de9664838a082172 100644 (file)
@@ -247,7 +247,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        dout(10, "alloc_inode %p\n", &ci->vfs_inode);
 
        ci->i_version = 0;
-       ci->i_truncate_seq = 0;
        ci->i_time_warp_seq = 0;
        ci->i_ceph_flags = 0;
        ci->i_symlink = NULL;
@@ -273,6 +272,9 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
                ci->i_nr_by_mode[i] = 0;
 
+       ci->i_truncate_seq = 0;
+       ci->i_truncate_size = 0;
+
        ci->i_max_size = 0;
        ci->i_reported_size = 0;
        ci->i_wanted_max_size = 0;
@@ -330,7 +332,7 @@ void ceph_destroy_inode(struct inode *inode)
  * truncate() increments the corresponding _seq values on the MDS.
  */
 void ceph_fill_file_bits(struct inode *inode, int issued,
-                        u64 truncate_seq, u64 size,
+                        u32 truncate_seq, u64 truncate_size, u64 size,
                         u64 time_warp_seq, struct timespec *ctime,
                         struct timespec *mtime, struct timespec *atime)
 {
@@ -345,6 +347,8 @@ void ceph_fill_file_bits(struct inode *inode, int issued,
                ci->i_reported_size = size;
                ci->i_truncate_seq = truncate_seq;
        }
+       if (truncate_seq >= ci->i_truncate_seq)
+               ci->i_truncate_size = truncate_size;
 
        if (issued & (CEPH_CAP_FILE_EXCL|
                      CEPH_CAP_FILE_WR|
@@ -379,7 +383,7 @@ void ceph_fill_file_bits(struct inode *inode, int issued,
                }
        }
        if (warn) /* time_warp_seq shouldn't go backwards */
-               dout(10, "%p mds time_warp_seq %llu < %llu\n",
+               dout(10, "%p mds time_warp_seq %llu < %u\n",
                     inode, time_warp_seq, ci->i_time_warp_seq);
 }
 
@@ -455,7 +459,8 @@ static int fill_inode(struct inode *inode,
        ceph_decode_timespec(&mtime, &info->mtime);
        ceph_decode_timespec(&ctime, &info->ctime);
        ceph_fill_file_bits(inode, issued,
-                           le64_to_cpu(info->truncate_seq),
+                           le32_to_cpu(info->truncate_seq),
+                           le64_to_cpu(info->truncate_size),
                            le64_to_cpu(info->size),
                            le32_to_cpu(info->time_warp_seq),
                            &ctime, &mtime, &atime);
index b583831a64f105a2dc42b36076f0b66f35763104..3722cddeb27bdea768dfe089fd3b8768080014ca 100644 (file)
@@ -90,13 +90,14 @@ void ceph_osdc_put_request(struct ceph_osd_request *req)
  */
 static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, short opc,
                                        struct ceph_snap_context *snapc,
-                                       int do_sync)
+                                       int do_sync, int do_trunc)
 {
        struct ceph_msg *req;
        struct ceph_osd_request_head *head;
        struct ceph_osd_op *op;
        __le64 *snaps;
-       size_t size = sizeof(*head) + (1 + do_sync)*sizeof(*op);
+       int num_op = 1 + do_sync + do_trunc;
+       size_t size = sizeof(*head) + num_op*sizeof(*op);
        int i;
 
        if (snapc)
@@ -107,19 +108,24 @@ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, short opc,
        memset(req->front.iov_base, 0, req->front.iov_len);
        head = req->front.iov_base;
        op = (void *)(head + 1);
-       snaps = (void *)(op + 1);
+       snaps = (void *)(op + num_op);
 
        /* encode head */
        head->client_inc = cpu_to_le32(1); /* always, for now. */
        head->flags = 0;
-       head->num_ops = cpu_to_le16(1 + do_sync);
+       head->num_ops = cpu_to_le16(num_op);
        op->op = cpu_to_le16(opc);
 
+       if (do_trunc) {
+               op++;
+               op->op = cpu_to_le16(opc == CEPH_OSD_OP_READ ? 
+                            CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
+               /* call set_trunc later */
+       }
        if (do_sync) {
                op++;
                op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
        }
-
        if (snapc) {
                head->snap_seq = cpu_to_le64(snapc->seq);
                head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -129,6 +135,21 @@ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, short opc,
        return req;
 }
 
+/*
+ * Set truncate op's truncate_size relative to object offset,
+ * after we calculate the layout.
+ */
+static void set_trunc(struct ceph_osd_request *req, u64 file_off,
+                     u32 truncate_seq, u64 truncate_size)
+{
+       struct ceph_osd_request_head *head = req->r_request->front.iov_base;
+       struct ceph_osd_op *op = (void *)(head + 1);
+       struct ceph_osd_op *top = op + 1;
+
+       op->truncate_seq = truncate_seq;
+       op->truncate_size = truncate_size - (file_off - top->offset);
+}
+
 /*
  * build new request AND message, calculate layout, and adjust file
  * extent as needed.
@@ -138,19 +159,22 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               struct ceph_vino vino,
                                               u64 off, u64 *plen, int op,
                                               struct ceph_snap_context *snapc,
-                                              int do_sync)
+                                              int do_sync,
+                                              u32 truncate_seq,
+                                              u64 truncate_size)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
        int num_pages = calc_pages_for(off, *plen);
        struct ceph_osd_request_head *head;
+       int do_trunc = off + *plen > truncate_size;
 
        /* we may overallocate here, if our write extent is shortened below */
        req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
-       msg = new_request_msg(osdc, op, snapc, do_sync);
+       msg = new_request_msg(osdc, op, snapc, do_sync, do_trunc);
        if (IS_ERR(msg)) {
                kfree(req);
                return ERR_PTR(PTR_ERR(msg));
@@ -160,6 +184,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        /* calculate max write size, pgid */
        calc_layout(osdc, vino, layout, off, plen, req);
+       if (do_trunc)
+               set_trunc(req, off, truncate_seq, truncate_size);
 
        head = msg->front.iov_base;
        req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
@@ -804,6 +830,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
                        struct ceph_file_layout *layout,
                        u64 off, u64 len,
+                       u32 truncate_seq, u64 truncate_size,
                        char __user *data)
 {
        struct ceph_osd_request *req;
@@ -816,7 +843,8 @@ int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
 
 more:
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_READ, NULL, 0);
+                                   CEPH_OSD_OP_READ, NULL, 0,
+                                   truncate_seq, truncate_size);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -880,6 +908,7 @@ out:
 int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
                       struct ceph_file_layout *layout,
                       u64 off, u64 len,
+                      u32 truncate_seq, u64 truncate_size,
                       struct page *page)
 {
        struct ceph_osd_request *req;
@@ -888,7 +917,8 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
        dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino,
             vino.snap, off, len);
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_READ, NULL, 0);
+                                   CEPH_OSD_OP_READ, NULL, 0,
+                                   truncate_seq, truncate_size);
        if (IS_ERR(req))
                return PTR_ERR(req);
        BUG_ON(len != PAGE_CACHE_SIZE);
@@ -911,6 +941,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct address_space *mapping,
                        struct ceph_vino vino, struct ceph_file_layout *layout,
                        u64 off, u64 len,
+                       u32 truncate_seq, u64 truncate_size,
                        struct list_head *page_list, int num_pages)
 {
        struct ceph_osd_request *req;
@@ -932,7 +963,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 
        /* alloc request, w/ optimistically-sized page vector */
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_READ, NULL, 0);
+                                   CEPH_OSD_OP_READ, NULL, 0,
+                                   truncate_seq, truncate_size);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -984,7 +1016,9 @@ out:
 int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
                         struct ceph_snap_context *snapc,
-                        u64 off, u64 len, const char __user *data)
+                        u64 off, u64 len,
+                        u32 truncate_seq, u64 truncate_size,
+                        const char __user *data)
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request_head *reqhead;
@@ -998,7 +1032,8 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
 
 more:
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_WRITE, snapc, 0);
+                                   CEPH_OSD_OP_WRITE, snapc, 0,
+                                   truncate_seq, truncate_size);
        if (IS_ERR(req))
                return PTR_ERR(req);
        reqm = req->r_request;
@@ -1068,6 +1103,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
                         struct ceph_snap_context *snapc,
                         u64 off, u64 len,
+                        u32 truncate_seq, u64 truncate_size,
                         struct page **pages, int num_pages)
 {
        struct ceph_msg *reqm;
@@ -1080,7 +1116,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        BUG_ON(vino.snap != CEPH_NOSNAP);
 
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_WRITE, snapc, 0);
+                                   CEPH_OSD_OP_WRITE, snapc, 0,
+                                   truncate_seq, truncate_size);
        if (IS_ERR(req))
                return PTR_ERR(req);
        reqm = req->r_request;
index b132e82a60bb0db244fa22304779e1bdc61b8882..a812dcfcf040b514a3a88c0700995a49c4cb4399 100644 (file)
@@ -94,19 +94,22 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      struct ceph_vino vino,
                                      u64 offset, u64 *len, int op,
                                      struct ceph_snap_context *snapc,
-                                     int do_sync);
+                                     int do_sync, u32 truncate_eq,
+                                     u64 truncate_size);
 extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 
 extern int ceph_osdc_readpage(struct ceph_osd_client *osdc,
                              struct ceph_vino vino,
                              struct ceph_file_layout *layout,
                              u64 off, u64 len,
+                             u32 truncate_seq, u64 truncate_size,
                              struct page *page);
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                               struct address_space *mapping,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
                               u64 off, u64 len,
+                              u32 truncate_seq, u64 truncate_size,
                               struct list_head *page_list, int nr_pages);
 
 extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
@@ -114,6 +117,7 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_file_layout *layout,
                                struct ceph_snap_context *sc,
                                u64 off, u64 len,
+                               u32 truncate_seq, u64 truncate_size,
                                struct page **pagevec, int nr_pages);
 extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
                                      struct ceph_osd_request *req,
@@ -124,12 +128,14 @@ extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
                               u64 off, u64 len,
+                              u32 truncate_seq, u64 truncate_size,
                               char __user *data);
 extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,
                                struct ceph_file_layout *layout,
                                struct ceph_snap_context *sc,
                                u64 off, u64 len,
+                               u32 truncate_seq, u64 truncate_size,
                                const char __user *data);
 
 #endif
index 79946318c78a7ba0b77003f117f88a1a3783acdd..da6ec379292b4e2726e00cdd7c4e2f296b4997aa 100644 (file)
@@ -211,7 +211,7 @@ struct ceph_inode_info {
        struct ceph_vino i_vino;   /* ceph ino + snap */
 
        u64 i_version;
-       u64 i_truncate_seq, i_time_warp_seq;
+       u32 i_time_warp_seq;
 
        unsigned i_ceph_flags;
 
@@ -255,10 +255,13 @@ struct ceph_inode_info {
 
        int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
 
-       loff_t i_max_size;            /* max file size authorized by mds */
-       loff_t i_reported_size; /* (max_)size reported to or requested of mds */
-       loff_t i_wanted_max_size;     /* offset we'd like to write too */
-       loff_t i_requested_max_size;  /* max_size we've requested */
+       u32 i_truncate_seq;
+       u64 i_truncate_size;
+
+       u64 i_max_size;            /* max file size authorized by mds */
+       u64 i_reported_size; /* (max_)size reported to or requested of mds */
+       u64 i_wanted_max_size;     /* offset we'd like to write too */
+       u64 i_requested_max_size;  /* max_size we've requested */
 
        struct timespec i_old_atime;
 
@@ -673,7 +676,7 @@ extern struct inode *ceph_get_inode(struct super_block *sb,
                                    struct ceph_vino vino);
 extern struct inode *ceph_get_snapdir(struct inode *parent);
 extern void ceph_fill_file_bits(struct inode *inode, int issued,
-                               u64 truncate_seq, u64 size,
+                               u32 truncate_seq, u64 truncate_size, u64 size,
                                u64 time_warp_seq, struct timespec *ctime,
                                struct timespec *mtime, struct timespec *atime);
 extern int ceph_fill_trace(struct super_block *sb,