}
}
have = __ceph_caps_issued(ci, &implemented);
- /* HACK: force sync writes...
- have &= ~CEPH_CAP_WRBUFFER;
- implemented &= ~CEPH_CAP_WRBUFFER;
- */
+
+ /*
+ * disallow writes while a truncate is pending
+ */
+ if (ci->i_truncate_pending)
+ have &= ~CEPH_CAP_FILE_WR;
+
if ((have & need) == need) {
/*
* Look at (implemented & ~have & not) so that we keep waiting
}
/* size/ctime/mtime/atime? */
+ ceph_fill_file_size(inode, issued,
+ le32_to_cpu(grant->truncate_seq),
+ le64_to_cpu(grant->truncate_size), size);
ceph_decode_timespec(&mtime, &grant->mtime);
ceph_decode_timespec(&atime, &grant->atime);
ceph_decode_timespec(&ctime, &grant->ctime);
- ceph_fill_file_bits(inode, issued,
- le32_to_cpu(grant->truncate_seq),
- le64_to_cpu(grant->truncate_size), size,
+ ceph_fill_file_time(inode, issued,
le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
&atime);
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
int seq = le32_to_cpu(trunc->seq);
+ u32 truncate_seq = le32_to_cpu(trunc->truncate_seq);
+ u64 truncate_size = le64_to_cpu(trunc->truncate_size);
u64 size = le64_to_cpu(trunc->size);
+ int implemented = 0;
+ int dirty = __ceph_caps_dirty(ci);
+ int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
int queue_trunc = 0;
+
+ issued |= implemented | dirty;
- dout(10, "handle_cap_trunc inode %p mds%d seq %d\n", inode, mds, seq);
- queue_trunc = __ceph_queue_vmtruncate(inode, size);
+ dout(10, "handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n",
+ inode, mds, seq, truncate_size, truncate_seq);
+ queue_trunc = ceph_fill_file_size(inode, issued,
+ truncate_seq, truncate_size, size);
spin_unlock(&inode->i_lock);
if (queue_trunc)
ci->i_truncate_seq = 0;
ci->i_truncate_size = 0;
+ ci->i_truncate_pending = 0;
ci->i_max_size = 0;
ci->i_reported_size = 0;
INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
INIT_WORK(&ci->i_pg_inv_work, ceph_inode_invalidate_pages);
- ci->i_vmtruncate_to = -1;
INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
INIT_LIST_HEAD(&ci->i_listener_list);
/*
- * Helper to fill in size, ctime, mtime, and atime. We have to be
+ * Helpers to fill in size, ctime, mtime, and atime. We have to be
* careful because either the client or MDS may have more up to date
* info, depending on which capabilities are held, and whether
* time_warp_seq or truncate_seq have increased. Ordinarily, mtime
* and size are monotonically increasing, except when utimes() or
* truncate() increments the corresponding _seq values on the MDS.
*/
-int ceph_fill_file_bits(struct inode *inode, int issued,
- u32 truncate_seq, u64 truncate_size, u64 size,
- u64 time_warp_seq, struct timespec *ctime,
- struct timespec *mtime, struct timespec *atime)
+int ceph_fill_file_size(struct inode *inode, int issued,
+ u32 truncate_seq, u64 truncate_size, u64 size)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- int warn = 0;
int queue_trunc = 0;
if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
(truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
dout(10, "size %lld -> %llu\n", inode->i_size, size);
-
- if (issued & (CEPH_CAP_FILE_RDCACHE|CEPH_CAP_FILE_RD|
- CEPH_CAP_FILE_WR|CEPH_CAP_FILE_WRBUFFER|
- CEPH_CAP_FILE_EXCL))
- queue_trunc = __ceph_queue_vmtruncate(inode, size);
-
inode->i_size = size;
inode->i_blocks = (size + (1<<9) - 1) >> 9;
ci->i_reported_size = size;
dout(10, "truncate_seq %u -> %u\n",
ci->i_truncate_seq, truncate_seq);
ci->i_truncate_seq = truncate_seq;
+ ci->i_truncate_pending++;
+ if (issued & (CEPH_CAP_FILE_RDCACHE|CEPH_CAP_FILE_RD|
+ CEPH_CAP_FILE_WR|CEPH_CAP_FILE_WRBUFFER|
+ CEPH_CAP_FILE_EXCL))
+ queue_trunc = 1;
}
}
if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) >= 0 &&
truncate_size);
ci->i_truncate_size = truncate_size;
}
+ return queue_trunc;
+}
+
+void ceph_fill_file_time(struct inode *inode, int issued,
+ u64 time_warp_seq, struct timespec *ctime,
+ struct timespec *mtime, struct timespec *atime)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int warn = 0;
if (issued & (CEPH_CAP_FILE_EXCL|
CEPH_CAP_FILE_WR|
if (warn) /* time_warp_seq shouldn't go backwards */
dout(10, "%p mds time_warp_seq %llu < %u\n",
inode, time_warp_seq, ci->i_time_warp_seq);
-
- return queue_trunc;
}
/*
ceph_decode_timespec(&atime, &info->atime);
ceph_decode_timespec(&mtime, &info->mtime);
ceph_decode_timespec(&ctime, &info->ctime);
- queue_trunc = ceph_fill_file_bits(inode, issued,
+ queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(info->truncate_seq),
le64_to_cpu(info->truncate_size),
- le64_to_cpu(info->size),
- le32_to_cpu(info->time_warp_seq),
- &ctime, &mtime, &atime);
+ le64_to_cpu(info->size));
+ ceph_fill_file_time(inode, issued,
+ le32_to_cpu(info->time_warp_seq),
+ &ctime, &mtime, &atime);
ci->i_max_size = le64_to_cpu(info->max_size);
ci->i_layout = info->layout;
iput(inode);
}
-int __ceph_queue_vmtruncate(struct inode *inode, __u64 size)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int queue_trunc = 0;
-
- /*
- * vmtruncate lazily; we can't block on i_mutex in the message
- * handler path, or we deadlock against osd op replies needed
- * to complete the writes holding i_lock. vmtruncate will
- * also block on page locks held by writes...
- *
- * if its an expansion, and there is no truncate pending, we
- * don't need to truncate.
- */
- if (ci->i_vmtruncate_to < 0 && size > inode->i_size) {
- dout(10, "clean fwd truncate, no vmtruncate needed\n");
- } else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to) {
- dout(10, "trunc to %lld < %lld already queued\n",
- ci->i_vmtruncate_to, size);
- } else {
- /* we need to trunc even smaller */
- dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
- ci->i_vmtruncate_to = size;
- queue_trunc = 1;
- }
- i_size_write(inode, size);
- ci->i_reported_size = size;
-
- return queue_trunc;
-}
-
/*
* called with i_mutex held.
*
void __ceph_do_pending_vmtruncate(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- loff_t to;
- int wrbuffer_refs;
+ u64 to;
+ int wrbuffer_refs, wake = 0;
spin_lock(&inode->i_lock);
- to = ci->i_vmtruncate_to;
- ci->i_vmtruncate_to = -1;
+ if (ci->i_truncate_pending == 0) {
+ dout(10, "__do_pending_vmtruncate %p none pending\n", inode);
+ spin_unlock(&inode->i_lock);
+ return;
+ }
+ to = ci->i_truncate_size;
wrbuffer_refs = ci->i_wrbuffer_ref;
+ dout(10, "__do_pending_vmtruncate %p (%d) to %lld\n", inode,
+ ci->i_truncate_pending, to);
spin_unlock(&inode->i_lock);
- if (to >= 0) {
- dout(10, "__do_pending_vmtruncate %p to %lld\n", inode, to);
- truncate_inode_pages(inode->i_mapping, to);
- if (wrbuffer_refs == 0)
- ceph_check_caps(ci, 0, 0, NULL);
- } else {
- dout(10, "__do_pending_vmtruncate %p nothing to do\n", inode);
- }
+ truncate_inode_pages(inode->i_mapping, to);
+
+ spin_lock(&inode->i_lock);
+ ci->i_truncate_pending--;
+ if (ci->i_truncate_pending == 0)
+ wake = 1;
+ spin_unlock(&inode->i_lock);
+
+ if (wrbuffer_refs == 0)
+ ceph_check_caps(ci, 0, 0, NULL);
+ if (wake)
+ wake_up(&ci->i_cap_wq);
}
/*
int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
- u32 i_truncate_seq;
- u64 i_truncate_size;
+ u32 i_truncate_seq; /* last truncate to smaller size */
+ u64 i_truncate_size; /* and the size we last truncated down to */
+ int i_truncate_pending; /* still need to call vmtruncate */
u64 i_max_size; /* max file size authorized by mds */
u64 i_reported_size; /* (max_)size reported to or requested of mds */
struct work_struct i_wb_work; /* writeback work */
struct work_struct i_pg_inv_work; /* page invalidation work */
- loff_t i_vmtruncate_to; /* delayed truncate work */
struct work_struct i_vmtruncate_work;
struct list_head i_listener_list; /* requests we pend on */
extern struct inode *ceph_get_inode(struct super_block *sb,
struct ceph_vino vino);
extern struct inode *ceph_get_snapdir(struct inode *parent);
-extern int ceph_fill_file_bits(struct inode *inode, int issued,
- u32 truncate_seq, u64 truncate_size, u64 size,
- u64 time_warp_seq, struct timespec *ctime,
- struct timespec *mtime, struct timespec *atime);
+extern int ceph_fill_file_size(struct inode *inode, int issued,
+ u32 truncate_seq, u64 truncate_size, u64 size);
+extern void ceph_fill_file_time(struct inode *inode, int issued,
+ u64 time_warp_seq, struct timespec *ctime,
+ struct timespec *mtime, struct timespec *atime);
extern int ceph_fill_trace(struct super_block *sb,
struct ceph_mds_request *req,
struct ceph_mds_session *session);
extern void ceph_inode_set_size(struct inode *inode, loff_t size);
extern void ceph_inode_writeback(struct work_struct *work);
extern void ceph_vmtruncate_work(struct work_struct *work);
-extern int __ceph_queue_vmtruncate(struct inode *inode, __u64 size);
extern void __ceph_do_pending_vmtruncate(struct inode *inode);
extern int ceph_do_getattr(struct dentry *dentry, int mask);