* caller should hold snap_rwsem (read), s_mutex.
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
- int op, int used, int want, int retain, int flushing)
+ int op, int used, int want, int retain, int flushing,
+ unsigned *pflush_tid)
__releases(cap->ci->vfs_inode->i_lock)
{
struct ceph_inode_info *ci = cap->ci;
* first ack clean Ax.
*/
flush_tid = ++ci->i_cap_flush_last_tid;
+ if (pflush_tid)
+ *pflush_tid = flush_tid;
dout(" cap_flush_tid %d\n", (int)flush_tid);
for (i = 0; i < CEPH_CAP_BITS; i++)
if (flushing & (1 << i))
/* __send_cap drops i_lock */
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want,
- retain, flushing);
+ retain, flushing, NULL);
goto retry; /* retake i_lock and restart our cap scan. */
}
/*
* Try to flush dirty caps back to the auth mds.
*/
-static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session)
+static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
+ unsigned *flush_tid)
{
struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
/* __send_cap drops i_lock */
__send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
- cap->issued | cap->implemented, flushing);
+ cap->issued | cap->implemented, flushing,
+ flush_tid);
goto out_unlocked;
}
out:
return flushing;
}
-static int caps_are_clean(struct inode *inode)
+/*
+ * Return true if we've flushed caps through the given flush_tid.
+ */
+static int caps_are_flushed(struct inode *inode, unsigned tid)
{
- int dirty;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int dirty, i, ret = 1;
+
spin_lock(&inode->i_lock);
- dirty = __ceph_caps_dirty(ceph_inode(inode));
+ dirty = __ceph_caps_dirty(ci);
+ for (i = 0; i < CEPH_CAP_BITS; i++)
+ if ((ci->i_flushing_caps & (1 << i)) &&
+ ci->i_cap_flush_tid[i] <= tid) {
+ /* still flushing this bit */
+ ret = 0;
+ break;
+ }
spin_unlock(&inode->i_lock);
- return !dirty;
+ return ret;
+}
+
+/*
+ * Wait on any unsafe replies for the given inode. First wait on the
+ * newest request, and make that the upper bound. Then, if there are
+ * more requests, keep waiting on the oldest as long as it is still older
+ * than the original request.
+ */
+static void sync_write_wait(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct list_head *head = &ci->i_unsafe_writes;
+ struct ceph_osd_request *req;
+ u64 last_tid;
+
+ spin_lock(&ci->i_unsafe_lock);
+ if (list_empty(head))
+ goto out;
+
+ /* set upper bound as _last_ entry in chain */
+ req = list_entry(head->prev, struct ceph_osd_request,
+ r_unsafe_item);
+ last_tid = req->r_tid;
+
+ do {
+ ceph_osdc_get_request(req);
+ spin_unlock(&ci->i_unsafe_lock);
+ dout("sync_write_wait on tid %llu (until %llu)\n",
+ req->r_tid, last_tid);
+ wait_for_completion(&req->r_safe_completion);
+ spin_lock(&ci->i_unsafe_lock);
+ ceph_osdc_put_request(req);
+
+ /*
+ * from here on look at first entry in chain, since we
+ * only want to wait for anything older than last_tid
+ */
+ if (list_empty(head))
+ break;
+ req = list_entry(head->next, struct ceph_osd_request,
+ r_unsafe_item);
+ } while (req->r_tid < last_tid);
+out:
+ spin_unlock(&ci->i_unsafe_lock);
+}
+
+int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
+{
+ struct inode *inode = dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ unsigned flush_tid;
+ int ret;
+ int dirty;
+
+ dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
+ sync_write_wait(inode);
+
+ ret = filemap_write_and_wait(inode->i_mapping);
+ if (ret < 0)
+ return ret;
+
+ dirty = try_flush_caps(inode, NULL, &flush_tid);
+ dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
+
+ /*
+ * only wait on non-file metadata writeback (the mds
+ * can recover size and mtime, so we don't need to
+ * wait for that)
+ */
+ if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
+ dout("fsync waiting for flush_tid %u\n", flush_tid);
+ ret = wait_event_interruptible(ci->i_cap_wq,
+ caps_are_flushed(inode, flush_tid));
+ }
+
+ dout("fsync %p%s done\n", inode, datasync ? " datasync" : "");
+ return ret;
}
/*
* Flush any dirty caps back to the mds. If we aren't asked to wait,
* queue inode for flush but don't do so immediately, because we can
- * get by with fewer MDS messages if we wait for e.g. data writeback
- * to complete first.
+ * get by with fewer MDS messages if we wait for data writeback to
+ * complete first.
*/
int ceph_write_inode(struct inode *inode, int wait)
{
struct ceph_inode_info *ci = ceph_inode(inode);
+ unsigned flush_tid;
int err = 0;
int dirty;
dout("write_inode %p wait=%d\n", inode, wait);
if (wait) {
- dirty = try_flush_caps(inode, NULL);
+ dirty = try_flush_caps(inode, NULL, &flush_tid);
if (dirty)
err = wait_event_interruptible(ci->i_cap_wq,
- caps_are_clean(inode));
+ caps_are_flushed(inode, flush_tid));
} else {
struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
return err;
}
-
/*
* After a recovering MDS goes active, we need to resend any caps
* we were flushing.
__ceph_caps_used(ci),
__ceph_caps_wanted(ci),
cap->issued | cap->implemented,
- ci->i_flushing_caps);
+ ci->i_flushing_caps, NULL);
} else {
pr_err("ceph %p auth cap %p not mds%d ???\n", inode,
cap, session->s_mds);
issued, wanted, seq, mseq, realmino,
ttl_ms, jiffies - ttl_ms/2, CEPH_CAP_FLAG_AUTH,
NULL /* no caps context */);
- try_flush_caps(inode, session);
+ try_flush_caps(inode, session, NULL);
up_read(&mdsc->snap_rwsem);
}
ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
}
-/*
- * Wait on any unsafe replies for the given inode. First wait on the
- * newest request, and make that the upper bound. Then, if there are
- * more requests, keep waiting on the oldest as long as it is still older
- * than the original request.
- */
-static void sync_write_wait(struct inode *inode)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct list_head *head = &ci->i_unsafe_writes;
- struct ceph_osd_request *req;
- u64 last_tid;
-
- spin_lock(&ci->i_unsafe_lock);
- if (list_empty(head))
- goto out;
-
- /* set upper bound as _last_ entry in chain */
- req = list_entry(head->prev, struct ceph_osd_request,
- r_unsafe_item);
- last_tid = req->r_tid;
-
- do {
- ceph_osdc_get_request(req);
- spin_unlock(&ci->i_unsafe_lock);
- dout("sync_write_wait on tid %llu (until %llu)\n",
- req->r_tid, last_tid);
- wait_for_completion(&req->r_safe_completion);
- spin_lock(&ci->i_unsafe_lock);
- ceph_osdc_put_request(req);
-
- /*
- * from here on look at first entry in chain, since we
- * only want to wait for anything older than last_tid
- */
- if (list_empty(head))
- break;
- req = list_entry(head->next, struct ceph_osd_request,
- r_unsafe_item);
- } while (req->r_tid < last_tid);
-out:
- spin_unlock(&ci->i_unsafe_lock);
-}
-
/*
* Synchronous write, straight from __user pointer or user pages (if
* O_DIRECT).
return ret;
}
-static int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
-{
- struct inode *inode = dentry->d_inode;
- int ret;
-
- dout("fsync %p\n", inode);
- sync_write_wait(inode);
-
- ret = filemap_write_and_wait(inode->i_mapping);
- if (ret < 0)
- return ret;
-
- /*
- * Queue up the cap flush, but don't wait on it: the MDS can
- * recover from the object size/mtimes.
- */
- ceph_write_inode(inode, 0);
-
- return ret;
-}
-
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,