}
follows = capsnap->follows;
- next_follows = follows + 1;
-
size = capsnap->size;
atime = capsnap->atime;
mtime = capsnap->mtime;
follows, mds);
spin_lock(&inode->i_lock);
+ next_follows = follows + 1;
goto retry;
}
continue; /* nothing extra, all good */
/* delay cap release for a bit? */
- if (time_before(jiffies, ci->i_hold_caps_until)) {
+ if (!is_delayed &&
+ time_before(jiffies, ci->i_hold_caps_until)) {
dout(30, "delaying cap release\n");
continue;
}
case CEPH_CAP_OP_RELEASED:
handle_cap_released(inode, h, session);
up_write(&mdsc->snap_rwsem);
+ if (list_empty(&session->s_caps))
+ ceph_mdsc_flushed_all_caps(mdsc, session);
break;
case CEPH_CAP_OP_FLUSHEDSNAP:
case CEPH_CAP_OP_EXPORT:
handle_cap_export(inode, h, session);
up_write(&mdsc->snap_rwsem);
+ if (list_empty(&session->s_caps))
+ ceph_mdsc_flushed_all_caps(mdsc, session);
break;
case CEPH_CAP_OP_IMPORT:
spin_unlock(&mdsc->cap_delay_lock);
}
-
-/*
- * Force a flush of any snap_caps and write caps we hold.
- *
- * Caller holds snap_rwsem, s_mutex.
- */
-void ceph_flush_write_caps(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
-{
- struct list_head *p, *n;
-
- dout(10, "flush_write_caps mds%d\n", session->s_mds);
- list_for_each_safe (p, n, &session->s_caps) {
- struct ceph_cap *cap =
- list_entry(p, struct ceph_cap, session_caps);
- struct inode *inode = &cap->ci->vfs_inode;
- int used, wanted;
-
- spin_lock(&inode->i_lock);
-
- if ((cap->implemented & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0) {
- spin_unlock(&inode->i_lock);
- continue;
- }
-
- /* FIXME */
- if (!list_empty(&cap->ci->i_cap_snaps))
- __ceph_flush_snaps(cap->ci, NULL);
-
- used = __ceph_caps_used(cap->ci);
- wanted = __ceph_caps_wanted(cap->ci);
- if (used || wanted) {
- derr(0, "residual caps on %p u %d w %d s=%llu wrb=%d\n",
- inode, used, wanted, inode->i_size,
- cap->ci->i_wrbuffer_ref);
- used = wanted = 0;
- }
-
- /* __send_cap drops i_lock */
- __send_cap(mdsc, session, cap, used, wanted);
- }
-}
-
-
case CEPH_MDS_SESSION_NEW: return "new";
case CEPH_MDS_SESSION_OPENING: return "opening";
case CEPH_MDS_SESSION_OPEN: return "open";
+ case CEPH_MDS_SESSION_FLUSHING: return "flushing";
case CEPH_MDS_SESSION_CLOSING: return "closing";
case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
default: return "???";
wake_up_session_caps(session);
}
+
+
+static int request_close_session(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_msg *msg;
+ int err = 0;
+
+ msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
+ session->s_seq);
+ if (IS_ERR(msg))
+ err = PTR_ERR(msg);
+ else
+ ceph_send_msg_mds(mdsc, msg, session->s_mds);
+ return err;
+}
+
+/*
+ * check all caps on a session, without allowing release to
+ * be delayed.
+ */
+static void check_all_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct list_head *p, *n;
+
+ list_for_each_safe (p, n, &session->s_caps) {
+ struct ceph_cap *cap =
+ list_entry(p, struct ceph_cap, session_caps);
+ struct inode *inode = &cap->ci->vfs_inode;
+
+ igrab(inode);
+ mutex_unlock(&session->s_mutex);
+ ceph_check_caps(ceph_inode(inode), 1);
+ mutex_lock(&session->s_mutex);
+ iput(inode);
+ }
+}
+
+/*
+ * Called with s_mutex held.
+ */
+static int __close_session(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ int mds = session->s_mds;
+ int err = 0;
+
+ dout(10, "close_session mds%d state=%s\n", mds,
+ session_state_name(session->s_state));
+ if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
+ return 0;
+
+ check_all_caps(mdsc, session);
+
+ if (list_empty(&session->s_caps)) {
+ session->s_state = CEPH_MDS_SESSION_CLOSING;
+ err = request_close_session(mdsc, session);
+ } else {
+ session->s_state = CEPH_MDS_SESSION_FLUSHING;
+ }
+ return err;
+}
+
+/*
+ * Called when the last cap for a session has been flushed or
+ * exported.
+ */
+void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ dout(10, "flushed_all_caps for mds%d state %s\n", session->s_mds,
+ session_state_name(session->s_state));
+ if (session->s_state == CEPH_MDS_SESSION_FLUSHING) {
+ session->s_state = CEPH_MDS_SESSION_CLOSING;
+ request_close_session(mdsc, session);
+ }
+}
+
+
/*
* handle a mds session control message
*/
mutex_lock(&session->s_mutex);
- dout(2, "handle_session mds%d %s %p seq %llu\n",
- mds, ceph_session_op_name(op), session, seq);
+ dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
+ mds, ceph_session_op_name(op), session,
+ session_state_name(session->s_state), seq);
switch (op) {
case CEPH_SESSION_OPEN:
session->s_state = CEPH_MDS_SESSION_OPEN;
renewed_caps(mdsc, session, 0);
complete(&session->s_completion);
+ if (mdsc->stopping)
+ __close_session(mdsc, session);
break;
case CEPH_SESSION_RENEWCAPS:
oldstate = ceph_mdsmap_get_state(oldmap, i);
newstate = ceph_mdsmap_get_state(newmap, i);
- dout(20, "check_new_map mds%d state %d -> %d state %s\n",
+ dout(20, "check_new_map mds%d state %d -> %d (session %s)\n",
i, oldstate, newstate, session_state_name(s->s_state));
if (newstate < oldstate) {
/* if the state moved backwards, that means
-static int request_close_session(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
-{
- struct ceph_msg *msg;
- int err = 0;
-
- msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
- session->s_seq);
- if (IS_ERR(msg))
- err = PTR_ERR(msg);
- else
- ceph_send_msg_mds(mdsc, msg, session->s_mds);
- return err;
-}
-
-static int close_session(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
-{
- int mds = session->s_mds;
- int err = 0;
-
- dout(10, "close_session mds%d\n", mds);
- mutex_lock(&session->s_mutex);
-
- if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
- goto done;
-
- ceph_flush_write_caps(mdsc, session);
-
- session->s_state = CEPH_MDS_SESSION_CLOSING;
- err = request_close_session(mdsc, session);
-
-done:
- mutex_unlock(&session->s_mutex);
- return err;
-}
-
-
/*
* leases
*/
dout(10, "close_sessions\n");
mdsc->stopping = 1;
- /* clean out cap delay list */
+ /*
+ * clean out the delayed cap list; we will flush everything
+ * explicitly below.
+ */
spin_lock(&mdsc->cap_delay_lock);
while (!list_empty(&mdsc->cap_delay_list)) {
struct ceph_inode_info *ci;
if (!session)
continue;
mutex_unlock(&mdsc->mutex);
- close_session(mdsc, session);
+ mutex_lock(&session->s_mutex);
+ __close_session(mdsc, session);
+ mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
mutex_lock(&mdsc->mutex);
n++;
dout(10, "waiting for sessions to close\n");
mutex_unlock(&mdsc->mutex);
-
wait_for_completion_timeout(&mdsc->session_close_waiters,
timeout);
mutex_lock(&mdsc->mutex);