From a7397d8ce6da154fdfce4d03d310fe7370c61ecb Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 29 Jan 2009 11:22:15 -0800 Subject: [PATCH] kclient: include truncate osd_ops when needed --- src/include/ceph_fs.h | 3 +- src/kernel/addr.c | 22 ++++++++------ src/kernel/caps.c | 3 +- src/kernel/file.c | 6 ++-- src/kernel/inode.c | 13 ++++++--- src/kernel/osd_client.c | 63 ++++++++++++++++++++++++++++++++--------- src/kernel/osd_client.h | 8 +++++- src/kernel/super.h | 15 ++++++---- 8 files changed, 97 insertions(+), 36 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index c5942e9440a3c..381144f2a3ddc 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -842,7 +842,8 @@ struct ceph_mds_reply_inode { struct ceph_file_layout layout; struct ceph_timespec ctime, mtime, atime; __le32 time_warp_seq; - __le64 size, max_size, truncate_seq, truncate_size; + __le64 size, max_size, truncate_size; + __le32 truncate_seq; __le32 mode, uid, gid; __le32 nlink; __le64 files, subdirs, rbytes, rfiles, rsubdirs; /* dir stats */ diff --git a/src/kernel/addr.c b/src/kernel/addr.c index 5eb529b72e931..ad2e46d8363e8 100644 --- a/src/kernel/addr.c +++ b/src/kernel/addr.c @@ -230,7 +230,9 @@ static int readpage_nounlock(struct file *filp, struct page *page) dout(10, "readpage inode %p file %p page %p index %lu\n", inode, filp, page, page->index); err = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout, - page->index << PAGE_SHIFT, PAGE_SIZE, page); + page->index << PAGE_SHIFT, PAGE_SIZE, + ci->i_truncate_seq, ci->i_truncate_size, + page); if (unlikely(err < 0)) { SetPageError(page); goto out; @@ -278,6 +280,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, offset = page->index << PAGE_CACHE_SHIFT; rc = ceph_osdc_readpages(osdc, mapping, ceph_vino(inode), &ci->i_layout, offset, nr_pages << PAGE_CACHE_SHIFT, + ci->i_truncate_seq, ci->i_truncate_size, page_list, nr_pages); if (rc < 0) return rc; @@ -410,7 +413,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) set_page_writeback(page); err = ceph_osdc_writepages(osdc, ceph_vino(inode), &ci->i_layout, snapc, - page_off, len, &page, 1); + page_off, len, + ci->i_truncate_seq, ci->i_truncate_size, + &page, 1); if (err < 0) { dout(20, "writepage setting page error %p\n", page); SetPageError(page); @@ -700,12 +705,13 @@ get_more_pages: offset = page->index << PAGE_CACHE_SHIFT; len = wsize; req = ceph_osdc_new_request(&client->osdc, - &ci->i_layout, - ceph_vino(inode), - offset, &len, - CEPH_OSD_OP_WRITE, - snapc, - do_sync); + &ci->i_layout, + ceph_vino(inode), + offset, &len, + CEPH_OSD_OP_WRITE, + snapc, do_sync, + ci->i_truncate_seq, + ci->i_truncate_size); max_pages = req->r_num_pages; pages = req->r_pages; req->r_callback = writepages_finish; diff --git a/src/kernel/caps.c b/src/kernel/caps.c index e277b091aac4b..dc5dbae09ee87 100644 --- a/src/kernel/caps.c +++ b/src/kernel/caps.c @@ -1277,7 +1277,8 @@ start: ceph_decode_timespec(&atime, &grant->atime); ceph_decode_timespec(&ctime, &grant->ctime); ceph_fill_file_bits(inode, issued, - le32_to_cpu(grant->truncate_seq), size, + le32_to_cpu(grant->truncate_seq), + le64_to_cpu(grant->truncate_size), size, le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, &atime); diff --git a/src/kernel/file.c b/src/kernel/file.c index 857b3d4f69e15..dafe2a49140ce 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -240,7 +240,8 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data, ret = ceph_osdc_sync_read(&client->osdc, ceph_vino(inode), &ci->i_layout, - pos, count, data); + pos, count, ci->i_truncate_seq, + ci->i_truncate_size, data); if (ret > 0) *offset = pos + ret; return ret; @@ -267,7 +268,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode), &ci->i_layout, ci->i_snap_realm->cached_context, - pos, count, data); + pos, count, ci->i_truncate_seq, + ci->i_truncate_size, data); if (ret > 0) { pos += ret; *offset = pos; diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 95cbce0f240bb..7dbc270da4eb2 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -247,7 +247,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) dout(10, "alloc_inode %p\n", &ci->vfs_inode); ci->i_version = 0; - ci->i_truncate_seq = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; ci->i_symlink = NULL; @@ -273,6 +272,9 @@ struct inode *ceph_alloc_inode(struct super_block *sb) for (i = 0; i < CEPH_FILE_MODE_NUM; i++) ci->i_nr_by_mode[i] = 0; + ci->i_truncate_seq = 0; + ci->i_truncate_size = 0; + ci->i_max_size = 0; ci->i_reported_size = 0; ci->i_wanted_max_size = 0; @@ -330,7 +332,7 @@ void ceph_destroy_inode(struct inode *inode) * truncate() increments the corresponding _seq values on the MDS. */ void ceph_fill_file_bits(struct inode *inode, int issued, - u64 truncate_seq, u64 size, + u32 truncate_seq, u64 truncate_size, u64 size, u64 time_warp_seq, struct timespec *ctime, struct timespec *mtime, struct timespec *atime) { @@ -345,6 +347,8 @@ void ceph_fill_file_bits(struct inode *inode, int issued, ci->i_reported_size = size; ci->i_truncate_seq = truncate_seq; } + if (truncate_seq >= ci->i_truncate_seq) + ci->i_truncate_size = truncate_size; if (issued & (CEPH_CAP_FILE_EXCL| CEPH_CAP_FILE_WR| @@ -379,7 +383,7 @@ void ceph_fill_file_bits(struct inode *inode, int issued, } } if (warn) /* time_warp_seq shouldn't go backwards */ - dout(10, "%p mds time_warp_seq %llu < %llu\n", + dout(10, "%p mds time_warp_seq %llu < %u\n", inode, time_warp_seq, ci->i_time_warp_seq); } @@ -455,7 +459,8 @@ static int fill_inode(struct inode *inode, ceph_decode_timespec(&mtime, &info->mtime); ceph_decode_timespec(&ctime, &info->ctime); ceph_fill_file_bits(inode, issued, - le64_to_cpu(info->truncate_seq), + le32_to_cpu(info->truncate_seq), + le64_to_cpu(info->truncate_size), le64_to_cpu(info->size), le32_to_cpu(info->time_warp_seq), &ctime, &mtime, &atime); diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index b583831a64f10..3722cddeb27bd 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -90,13 +90,14 @@ void ceph_osdc_put_request(struct ceph_osd_request *req) */ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, short opc, struct ceph_snap_context *snapc, - int do_sync) + int do_sync, int do_trunc) { struct ceph_msg *req; struct ceph_osd_request_head *head; struct ceph_osd_op *op; __le64 *snaps; - size_t size = sizeof(*head) + (1 + do_sync)*sizeof(*op); + int num_op = 1 + do_sync + do_trunc; + size_t size = sizeof(*head) + num_op*sizeof(*op); int i; if (snapc) @@ -107,19 +108,24 @@ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, short opc, memset(req->front.iov_base, 0, req->front.iov_len); head = req->front.iov_base; op = (void *)(head + 1); - snaps = (void *)(op + 1); + snaps = (void *)(op + num_op); /* encode head */ head->client_inc = cpu_to_le32(1); /* always, for now. */ head->flags = 0; - head->num_ops = cpu_to_le16(1 + do_sync); + head->num_ops = cpu_to_le16(num_op); op->op = cpu_to_le16(opc); + if (do_trunc) { + op++; + op->op = cpu_to_le16(opc == CEPH_OSD_OP_READ ? + CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC); + /* call set_trunc later */ + } if (do_sync) { op++; op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC); } - if (snapc) { head->snap_seq = cpu_to_le64(snapc->seq); head->num_snaps = cpu_to_le32(snapc->num_snaps); @@ -129,6 +135,21 @@ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, short opc, return req; } +/* + * Set truncate op's truncate_size relative to object offset, + * after we calculate the layout. + */ +static void set_trunc(struct ceph_osd_request *req, u64 file_off, + u32 truncate_seq, u64 truncate_size) +{ + struct ceph_osd_request_head *head = req->r_request->front.iov_base; + struct ceph_osd_op *op = (void *)(head + 1); + struct ceph_osd_op *top = op + 1; + + op->truncate_seq = truncate_seq; + op->truncate_size = truncate_size - (file_off - top->offset); +} + /* * build new request AND message, calculate layout, and adjust file * extent as needed. @@ -138,19 +159,22 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, struct ceph_vino vino, u64 off, u64 *plen, int op, struct ceph_snap_context *snapc, - int do_sync) + int do_sync, + u32 truncate_seq, + u64 truncate_size) { struct ceph_osd_request *req; struct ceph_msg *msg; int num_pages = calc_pages_for(off, *plen); struct ceph_osd_request_head *head; + int do_trunc = off + *plen > truncate_size; /* we may overallocate here, if our write extent is shortened below */ req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS); if (req == NULL) return ERR_PTR(-ENOMEM); - msg = new_request_msg(osdc, op, snapc, do_sync); + msg = new_request_msg(osdc, op, snapc, do_sync, do_trunc); if (IS_ERR(msg)) { kfree(req); return ERR_PTR(PTR_ERR(msg)); @@ -160,6 +184,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, /* calculate max write size, pgid */ calc_layout(osdc, vino, layout, off, plen, req); + if (do_trunc) + set_trunc(req, off, truncate_seq, truncate_size); head = msg->front.iov_base; req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid); @@ -804,6 +830,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, char __user *data) { struct ceph_osd_request *req; @@ -816,7 +843,8 @@ int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino, more: req = ceph_osdc_new_request(osdc, layout, vino, off, &len, - CEPH_OSD_OP_READ, NULL, 0); + CEPH_OSD_OP_READ, NULL, 0, + truncate_seq, truncate_size); if (IS_ERR(req)) return PTR_ERR(req); @@ -880,6 +908,7 @@ out: int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, struct page *page) { struct ceph_osd_request *req; @@ -888,7 +917,8 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino, dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino, vino.snap, off, len); req = ceph_osdc_new_request(osdc, layout, vino, off, &len, - CEPH_OSD_OP_READ, NULL, 0); + CEPH_OSD_OP_READ, NULL, 0, + truncate_seq, truncate_size); if (IS_ERR(req)) return PTR_ERR(req); BUG_ON(len != PAGE_CACHE_SIZE); @@ -911,6 +941,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct address_space *mapping, struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, struct list_head *page_list, int num_pages) { struct ceph_osd_request *req; @@ -932,7 +963,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, /* alloc request, w/ optimistically-sized page vector */ req = ceph_osdc_new_request(osdc, layout, vino, off, &len, - CEPH_OSD_OP_READ, NULL, 0); + CEPH_OSD_OP_READ, NULL, 0, + truncate_seq, truncate_size); if (IS_ERR(req)) return PTR_ERR(req); @@ -984,7 +1016,9 @@ out: int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *snapc, - u64 off, u64 len, const char __user *data) + u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, + const char __user *data) { struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; @@ -998,7 +1032,8 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, more: req = ceph_osdc_new_request(osdc, layout, vino, off, &len, - CEPH_OSD_OP_WRITE, snapc, 0); + CEPH_OSD_OP_WRITE, snapc, 0, + truncate_seq, truncate_size); if (IS_ERR(req)) return PTR_ERR(req); reqm = req->r_request; @@ -1068,6 +1103,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *snapc, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, struct page **pages, int num_pages) { struct ceph_msg *reqm; @@ -1080,7 +1116,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, BUG_ON(vino.snap != CEPH_NOSNAP); req = ceph_osdc_new_request(osdc, layout, vino, off, &len, - CEPH_OSD_OP_WRITE, snapc, 0); + CEPH_OSD_OP_WRITE, snapc, 0, + truncate_seq, truncate_size); if (IS_ERR(req)) return PTR_ERR(req); reqm = req->r_request; diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index b132e82a60bb0..a812dcfcf040b 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -94,19 +94,22 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_vino vino, u64 offset, u64 *len, int op, struct ceph_snap_context *snapc, - int do_sync); + int do_sync, u32 truncate_eq, + u64 truncate_size); extern void ceph_osdc_put_request(struct ceph_osd_request *req); extern int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, struct page *page); extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct address_space *mapping, struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, struct list_head *page_list, int nr_pages); extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, @@ -114,6 +117,7 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, struct ceph_snap_context *sc, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, struct page **pagevec, int nr_pages); extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, struct ceph_osd_request *req, @@ -124,12 +128,14 @@ extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, char __user *data); extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *sc, u64 off, u64 len, + u32 truncate_seq, u64 truncate_size, const char __user *data); #endif diff --git a/src/kernel/super.h b/src/kernel/super.h index 79946318c78a7..da6ec379292b4 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -211,7 +211,7 @@ struct ceph_inode_info { struct ceph_vino i_vino; /* ceph ino + snap */ u64 i_version; - u64 i_truncate_seq, i_time_warp_seq; + u32 i_time_warp_seq; unsigned i_ceph_flags; @@ -255,10 +255,13 @@ struct ceph_inode_info { int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ - loff_t i_max_size; /* max file size authorized by mds */ - loff_t i_reported_size; /* (max_)size reported to or requested of mds */ - loff_t i_wanted_max_size; /* offset we'd like to write too */ - loff_t i_requested_max_size; /* max_size we've requested */ + u32 i_truncate_seq; + u64 i_truncate_size; + + u64 i_max_size; /* max file size authorized by mds */ + u64 i_reported_size; /* (max_)size reported to or requested of mds */ + u64 i_wanted_max_size; /* offset we'd like to write too */ + u64 i_requested_max_size; /* max_size we've requested */ struct timespec i_old_atime; @@ -673,7 +676,7 @@ extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); extern struct inode *ceph_get_snapdir(struct inode *parent); extern void ceph_fill_file_bits(struct inode *inode, int issued, - u64 truncate_seq, u64 size, + u32 truncate_seq, u64 truncate_size, u64 size, u64 time_warp_seq, struct timespec *ctime, struct timespec *mtime, struct timespec *atime); extern int ceph_fill_trace(struct super_block *sb, -- 2.39.5