struct ceph_file_layout layout;
struct ceph_timespec ctime, mtime, atime;
__le32 time_warp_seq;
- __le64 size, max_size, truncate_seq, truncate_size;
+ __le64 size, max_size, truncate_size;
+ __le32 truncate_seq;
__le32 mode, uid, gid;
__le32 nlink;
__le64 files, subdirs, rbytes, rfiles, rsubdirs; /* dir stats */
dout(10, "readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
err = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout,
- page->index << PAGE_SHIFT, PAGE_SIZE, page);
+ page->index << PAGE_SHIFT, PAGE_SIZE,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ page);
if (unlikely(err < 0)) {
SetPageError(page);
goto out;
offset = page->index << PAGE_CACHE_SHIFT;
rc = ceph_osdc_readpages(osdc, mapping, ceph_vino(inode), &ci->i_layout,
offset, nr_pages << PAGE_CACHE_SHIFT,
+ ci->i_truncate_seq, ci->i_truncate_size,
page_list, nr_pages);
if (rc < 0)
return rc;
set_page_writeback(page);
err = ceph_osdc_writepages(osdc, ceph_vino(inode),
&ci->i_layout, snapc,
- page_off, len, &page, 1);
+ page_off, len,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ &page, 1);
if (err < 0) {
dout(20, "writepage setting page error %p\n", page);
SetPageError(page);
offset = page->index << PAGE_CACHE_SHIFT;
len = wsize;
req = ceph_osdc_new_request(&client->osdc,
- &ci->i_layout,
- ceph_vino(inode),
- offset, &len,
- CEPH_OSD_OP_WRITE,
- snapc,
- do_sync);
+ &ci->i_layout,
+ ceph_vino(inode),
+ offset, &len,
+ CEPH_OSD_OP_WRITE,
+ snapc, do_sync,
+ ci->i_truncate_seq,
+ ci->i_truncate_size);
max_pages = req->r_num_pages;
pages = req->r_pages;
req->r_callback = writepages_finish;
ceph_decode_timespec(&atime, &grant->atime);
ceph_decode_timespec(&ctime, &grant->ctime);
ceph_fill_file_bits(inode, issued,
- le32_to_cpu(grant->truncate_seq), size,
+ le32_to_cpu(grant->truncate_seq),
+ le64_to_cpu(grant->truncate_size), size,
le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
&atime);
ret = ceph_osdc_sync_read(&client->osdc, ceph_vino(inode),
&ci->i_layout,
- pos, count, data);
+ pos, count, ci->i_truncate_seq,
+ ci->i_truncate_size, data);
if (ret > 0)
*offset = pos + ret;
return ret;
ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode),
&ci->i_layout,
ci->i_snap_realm->cached_context,
- pos, count, data);
+ pos, count, ci->i_truncate_seq,
+ ci->i_truncate_size, data);
if (ret > 0) {
pos += ret;
*offset = pos;
dout(10, "alloc_inode %p\n", &ci->vfs_inode);
ci->i_version = 0;
- ci->i_truncate_seq = 0;
ci->i_time_warp_seq = 0;
ci->i_ceph_flags = 0;
ci->i_symlink = NULL;
for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
ci->i_nr_by_mode[i] = 0;
+ ci->i_truncate_seq = 0;
+ ci->i_truncate_size = 0;
+
ci->i_max_size = 0;
ci->i_reported_size = 0;
ci->i_wanted_max_size = 0;
* truncate() increments the corresponding _seq values on the MDS.
*/
void ceph_fill_file_bits(struct inode *inode, int issued,
- u64 truncate_seq, u64 size,
+ u32 truncate_seq, u64 truncate_size, u64 size,
u64 time_warp_seq, struct timespec *ctime,
struct timespec *mtime, struct timespec *atime)
{
ci->i_reported_size = size;
ci->i_truncate_seq = truncate_seq;
}
+ if (truncate_seq >= ci->i_truncate_seq)
+ ci->i_truncate_size = truncate_size;
if (issued & (CEPH_CAP_FILE_EXCL|
CEPH_CAP_FILE_WR|
}
}
if (warn) /* time_warp_seq shouldn't go backwards */
- dout(10, "%p mds time_warp_seq %llu < %llu\n",
+ dout(10, "%p mds time_warp_seq %llu < %u\n",
inode, time_warp_seq, ci->i_time_warp_seq);
}
ceph_decode_timespec(&mtime, &info->mtime);
ceph_decode_timespec(&ctime, &info->ctime);
ceph_fill_file_bits(inode, issued,
- le64_to_cpu(info->truncate_seq),
+ le32_to_cpu(info->truncate_seq),
+ le64_to_cpu(info->truncate_size),
le64_to_cpu(info->size),
le32_to_cpu(info->time_warp_seq),
&ctime, &mtime, &atime);
*/
static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, short opc,
struct ceph_snap_context *snapc,
- int do_sync)
+ int do_sync, int do_trunc)
{
struct ceph_msg *req;
struct ceph_osd_request_head *head;
struct ceph_osd_op *op;
__le64 *snaps;
- size_t size = sizeof(*head) + (1 + do_sync)*sizeof(*op);
+ int num_op = 1 + do_sync + do_trunc;
+ size_t size = sizeof(*head) + num_op*sizeof(*op);
int i;
if (snapc)
memset(req->front.iov_base, 0, req->front.iov_len);
head = req->front.iov_base;
op = (void *)(head + 1);
- snaps = (void *)(op + 1);
+ snaps = (void *)(op + num_op);
/* encode head */
head->client_inc = cpu_to_le32(1); /* always, for now. */
head->flags = 0;
- head->num_ops = cpu_to_le16(1 + do_sync);
+ head->num_ops = cpu_to_le16(num_op);
op->op = cpu_to_le16(opc);
+ if (do_trunc) {
+ op++;
+ op->op = cpu_to_le16(opc == CEPH_OSD_OP_READ ?
+ CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
+ /* call set_trunc later */
+ }
if (do_sync) {
op++;
op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
}
-
if (snapc) {
head->snap_seq = cpu_to_le64(snapc->seq);
head->num_snaps = cpu_to_le32(snapc->num_snaps);
return req;
}
+/*
+ * Set truncate op's truncate_size relative to object offset,
+ * after we calculate the layout.
+ */
+static void set_trunc(struct ceph_osd_request *req, u64 file_off,
+ u32 truncate_seq, u64 truncate_size)
+{
+ struct ceph_osd_request_head *head = req->r_request->front.iov_base;
+ struct ceph_osd_op *op = (void *)(head + 1);
+ struct ceph_osd_op *top = op + 1;
+
+ op->truncate_seq = truncate_seq;
+ op->truncate_size = truncate_size - (file_off - top->offset);
+}
+
/*
* build new request AND message, calculate layout, and adjust file
* extent as needed.
struct ceph_vino vino,
u64 off, u64 *plen, int op,
struct ceph_snap_context *snapc,
- int do_sync)
+ int do_sync,
+ u32 truncate_seq,
+ u64 truncate_size)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
int num_pages = calc_pages_for(off, *plen);
struct ceph_osd_request_head *head;
+ int do_trunc = off + *plen > truncate_size;
/* we may overallocate here, if our write extent is shortened below */
req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
- msg = new_request_msg(osdc, op, snapc, do_sync);
+ msg = new_request_msg(osdc, op, snapc, do_sync, do_trunc);
if (IS_ERR(msg)) {
kfree(req);
return ERR_PTR(PTR_ERR(msg));
/* calculate max write size, pgid */
calc_layout(osdc, vino, layout, off, plen, req);
+ if (do_trunc)
+ set_trunc(req, off, truncate_seq, truncate_size);
head = msg->front.iov_base;
req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
char __user *data)
{
struct ceph_osd_request *req;
more:
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
- CEPH_OSD_OP_READ, NULL, 0);
+ CEPH_OSD_OP_READ, NULL, 0,
+ truncate_seq, truncate_size);
if (IS_ERR(req))
return PTR_ERR(req);
int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
struct page *page)
{
struct ceph_osd_request *req;
dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino,
vino.snap, off, len);
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
- CEPH_OSD_OP_READ, NULL, 0);
+ CEPH_OSD_OP_READ, NULL, 0,
+ truncate_seq, truncate_size);
if (IS_ERR(req))
return PTR_ERR(req);
BUG_ON(len != PAGE_CACHE_SIZE);
struct address_space *mapping,
struct ceph_vino vino, struct ceph_file_layout *layout,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
struct list_head *page_list, int num_pages)
{
struct ceph_osd_request *req;
/* alloc request, w/ optimistically-sized page vector */
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
- CEPH_OSD_OP_READ, NULL, 0);
+ CEPH_OSD_OP_READ, NULL, 0,
+ truncate_seq, truncate_size);
if (IS_ERR(req))
return PTR_ERR(req);
int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *snapc,
- u64 off, u64 len, const char __user *data)
+ u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
+ const char __user *data)
{
struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
more:
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
- CEPH_OSD_OP_WRITE, snapc, 0);
+ CEPH_OSD_OP_WRITE, snapc, 0,
+ truncate_seq, truncate_size);
if (IS_ERR(req))
return PTR_ERR(req);
reqm = req->r_request;
struct ceph_file_layout *layout,
struct ceph_snap_context *snapc,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
struct page **pages, int num_pages)
{
struct ceph_msg *reqm;
BUG_ON(vino.snap != CEPH_NOSNAP);
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
- CEPH_OSD_OP_WRITE, snapc, 0);
+ CEPH_OSD_OP_WRITE, snapc, 0,
+ truncate_seq, truncate_size);
if (IS_ERR(req))
return PTR_ERR(req);
reqm = req->r_request;
struct ceph_vino vino,
u64 offset, u64 *len, int op,
struct ceph_snap_context *snapc,
- int do_sync);
+ int do_sync, u32 truncate_eq,
+ u64 truncate_size);
extern void ceph_osdc_put_request(struct ceph_osd_request *req);
extern int ceph_osdc_readpage(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
struct page *page);
extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct address_space *mapping,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
struct list_head *page_list, int nr_pages);
extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
struct ceph_snap_context *sc,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
struct page **pagevec, int nr_pages);
extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
char __user *data);
extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *sc,
u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
const char __user *data);
#endif
struct ceph_vino i_vino; /* ceph ino + snap */
u64 i_version;
- u64 i_truncate_seq, i_time_warp_seq;
+ u32 i_time_warp_seq;
unsigned i_ceph_flags;
int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
- loff_t i_max_size; /* max file size authorized by mds */
- loff_t i_reported_size; /* (max_)size reported to or requested of mds */
- loff_t i_wanted_max_size; /* offset we'd like to write too */
- loff_t i_requested_max_size; /* max_size we've requested */
+ u32 i_truncate_seq;
+ u64 i_truncate_size;
+
+ u64 i_max_size; /* max file size authorized by mds */
+ u64 i_reported_size; /* (max_)size reported to or requested of mds */
+ u64 i_wanted_max_size; /* offset we'd like to write too */
+ u64 i_requested_max_size; /* max_size we've requested */
struct timespec i_old_atime;
struct ceph_vino vino);
extern struct inode *ceph_get_snapdir(struct inode *parent);
extern void ceph_fill_file_bits(struct inode *inode, int issued,
- u64 truncate_seq, u64 size,
+ u32 truncate_seq, u64 truncate_size, u64 size,
u64 time_warp_seq, struct timespec *ctime,
struct timespec *mtime, struct timespec *atime);
extern int ceph_fill_trace(struct super_block *sb,