/*
* Add dirty inode to the sync (currently flushing) list.
*/
-static void __mark_caps_sync(struct inode *inode)
+static void __mark_caps_flushing(struct inode *inode,
+ struct ceph_mds_session *session)
{
struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
BUG_ON(list_empty(&ci->i_dirty_item));
spin_lock(&mdsc->cap_dirty_lock);
- if (list_empty(&ci->i_sync_item)) {
- dout(20, " inode %p now sync\n", &ci->vfs_inode);
- list_add(&ci->i_sync_item, &mdsc->cap_sync);
+ if (list_empty(&ci->i_flushing_item)) {
+ dout(20, " inode %p now flushing\n", &ci->vfs_inode);
+ list_add(&ci->i_flushing_item, &session->s_cap_flushing);
+ mdsc->num_cap_flushing++;
}
spin_unlock(&mdsc->cap_dirty_lock);
}
ceph_cap_string(ci->i_flushing_caps | flushing));
ci->i_flushing_caps |= flushing;
ci->i_dirty_caps = 0;
- __mark_caps_sync(inode);
+ __mark_caps_flushing(inode, session);
}
mds = cap->mds; /* remember mds, so we don't repeat */
if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
goto out;
- __mark_caps_sync(inode);
+ __mark_caps_flushing(inode, session);
flushing = ci->i_dirty_caps;
dout(10, " flushing %s, flushing_caps %s -> %s\n",
}
+/*
+ * After a recovering MDS goes active, we need to resend any caps
+ * we were flushing.
+ *
+ * Caller holds session->s_mutex.
+ */
+void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci;
+
+ dout(10, "kick_flushing_caps mds%d\n", session->s_mds);
+ list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
+ struct inode *inode = &ci->vfs_inode;
+ struct ceph_cap *cap;
+
+ spin_lock(&inode->i_lock);
+ cap = ci->i_auth_cap;
+ if (cap && cap->session == session) {
+ dout(20, "kick_flushing_caps %p cap %p %s\n", inode,
+ cap, ceph_cap_string(ci->i_flushing_caps));
+ __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+ __ceph_caps_used(ci),
+ __ceph_caps_wanted(ci),
+ cap->issued | cap->implemented,
+ ci->i_flushing_caps);
+ } else {
+ dout(0, " %p auth cap %p not mds%d ???\n", inode, cap,
+ session->s_mds);
+ spin_unlock(&inode->i_lock);
+ }
+ }
+}
+
+
/*
* Take references to capabilities we hold, so that we don't release
* them to the MDS prematurely.
new_dirty = ci->i_dirty_caps | ci->i_flushing_caps;
if (old_dirty) {
spin_lock(&mdsc->cap_dirty_lock);
- list_del_init(&ci->i_sync_item);
- if (list_empty(&mdsc->cap_sync))
- wake_up(&mdsc->cap_sync_wq);
- dout(20, " inode %p now !sync\n", inode);
+ list_del_init(&ci->i_flushing_item);
+ if (!list_empty(&session->s_cap_flushing))
+ dout(20, " mds%d still flushing cap on %p\n",
+ session->s_mds,
+ &list_entry(session->s_cap_flushing.next,
+ struct ceph_inode_info,
+ i_flushing_item)->vfs_inode);
+ mdsc->num_cap_flushing--;
+ if (!mdsc->num_cap_flushing)
+ wake_up(&mdsc->cap_flushing_wq);
+ else
+ dout(20, " still %d caps flushing\n",
+ mdsc->num_cap_flushing);
+ dout(20, " inode %p now !flushing\n", inode);
if (!new_dirty) {
dout(20, " inode %p now clean\n", inode);
list_del_init(&ci->i_dirty_item);
s->s_num_cap_releases = 0;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_releases_done);
+ INIT_LIST_HEAD(&s->s_cap_flushing);
dout(10, "register_session mds%d\n", mds);
if (mds >= mdsc->max_sessions) {
}
-
static int request_close_session(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
* sure it successfully forwarded our request before
* it died.
*/
- if (newstate >= CEPH_MDS_STATE_ACTIVE)
+ if (oldstate < CEPH_MDS_STATE_ACTIVE &&
+ newstate >= CEPH_MDS_STATE_ACTIVE) {
kick_requests(mdsc, i, 1);
+ ceph_kick_flushing_caps(mdsc, s);
+ }
}
}
INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock);
INIT_LIST_HEAD(&mdsc->cap_dirty);
- INIT_LIST_HEAD(&mdsc->cap_sync);
+ mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock);
- init_waitqueue_head(&mdsc->cap_sync_wq);
+ init_waitqueue_head(&mdsc->cap_flushing_wq);
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
}
*/
static int are_no_sync_caps(struct ceph_mds_client *mdsc)
{
- int empty;
+ int num;
spin_lock(&mdsc->cap_dirty_lock);
- empty = list_empty(&mdsc->cap_sync);
+ num = mdsc->num_cap_flushing;
spin_unlock(&mdsc->cap_dirty_lock);
- dout(20, "are_no_sync_caps = %d\n", empty);
- return empty;
+ dout(20, "are_no_sync_caps = %d\n", num);
+ return num == 0;
}
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
{
dout(10, "sync\n");
ceph_check_delayed_caps(mdsc);
- wait_event(mdsc->cap_sync_wq, are_no_sync_caps(mdsc));
+ wait_event(mdsc->cap_flushing_wq, are_no_sync_caps(mdsc));
}
struct rb_root i_caps; /* cap list */
struct ceph_cap *i_auth_cap; /* authoritative cap, if any */
unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */
- struct list_head i_dirty_item, i_sync_item;
+ struct list_head i_dirty_item, i_flushing_item;
wait_queue_head_t i_cap_wq; /* threads waiting on a capability */
unsigned long i_hold_caps_min; /* jiffies */
unsigned long i_hold_caps_max; /* jiffies */
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, int unused);
+extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session);
extern int ceph_get_cap_mds(struct inode *inode);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);