From 000defc5c2a39c716535dd903bb75fdf66334c71 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 15 Oct 2008 16:33:35 -0700 Subject: [PATCH] kclient: caps.c comments, cleanup --- src/TODO | 1 + src/kernel/caps.c | 513 +++++++++++++++++++++++++++------------------ src/kernel/super.h | 9 - 3 files changed, 315 insertions(+), 208 deletions(-) diff --git a/src/TODO b/src/TODO index 99d53edd7bb26..1938c322b12d7 100644 --- a/src/TODO +++ b/src/TODO @@ -47,6 +47,7 @@ snaps on osd - efficient recovery of clones using the clone diff info kernel client +- clean up cap flush on session close - make writepages maybe skip pages with errors? - EIO, or ENOSPC? - ... writeback vs ENOSPC vs flush vs close()... hrm... diff --git a/src/kernel/caps.c b/src/kernel/caps.c index 023a4c3cd9d02..5f75c8db53b1b 100644 --- a/src/kernel/caps.c +++ b/src/kernel/caps.c @@ -15,6 +15,11 @@ int ceph_debug_caps = -1; #include "messenger.h" +/* + * Find ceph_cap for given mds, if any. + * + * Called with i_lock held. + */ static struct ceph_cap *__get_cap_for_mds(struct inode *inode, int mds) { struct ceph_inode_info *ci = ceph_inode(inode); @@ -33,37 +38,17 @@ static struct ceph_cap *__get_cap_for_mds(struct inode *inode, int mds) return NULL; } -static void __insert_cap_node(struct ceph_inode_info *ci, - struct ceph_cap *new) -{ - struct rb_node **p = &ci->i_caps.rb_node; - struct rb_node *parent = NULL; - struct ceph_cap *cap = NULL; - - while (*p) { - parent = *p; - cap = rb_entry(parent, struct ceph_cap, ci_node); - if (new->mds < cap->mds) - p = &(*p)->rb_left; - else if (new->mds > cap->mds) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&new->ci_node, parent, p); - rb_insert_color(&new->ci_node, &ci->i_caps); -} - -int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq) +/* + * Return id of any MDS with a cap, preferably WR|WRBUFFER|EXCL, else + * -1. + */ +static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq) { struct ceph_cap *cap; int mds = -1; struct rb_node *p; - /* - * prefer mds with WR|WRBUFFER|EXCL caps - */ + /* prefer mds with WR|WRBUFFER|EXCL caps */ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); mds = cap->mds; @@ -85,7 +70,37 @@ int ceph_get_cap_mds(struct inode *inode) } /* - * caller should hold session snap_rwsem, s_mutex. + * Called under i_lock. + */ +static void __insert_cap_node(struct ceph_inode_info *ci, + struct ceph_cap *new) +{ + struct rb_node **p = &ci->i_caps.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap *cap = 0; + + while (*p) { + parent = *p; + cap = rb_entry(parent, struct ceph_cap, ci_node); + if (new->mds < cap->mds) + p = &(*p)->rb_left; + else if (new->mds > cap->mds) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&new->ci_node, parent, p); + rb_insert_color(&new->ci_node, &ci->i_caps); +} + +/* + * Add a capability under the given MDS session, after processing + * the snapblob (to update the snap realm hierarchy). + * + * Bump i_count when adding it's first cap. + * + * Caller should hold session snap_rwsem, s_mutex. * * @fmode can be negative, in which case it is ignored. */ @@ -96,35 +111,32 @@ int ceph_add_cap(struct inode *inode, void *snapblob, int snapblob_len, struct ceph_cap *new_cap) { - int mds = session->s_mds; + struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; + struct ceph_snap_realm *realm; + int mds = session->s_mds; int is_first = 0; - struct ceph_snap_realm *realm = NULL; - struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc; - if (snapblob_len) - realm = ceph_update_snap_trace(mdsc, - snapblob, snapblob+snapblob_len, - 0); + realm = ceph_update_snap_trace(mdsc, snapblob, snapblob+snapblob_len,0); - dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode, + dout(10, "add_cap on %p mds%d cap %d seq %d\n", inode, session->s_mds, issued, seq); retry: spin_lock(&inode->i_lock); cap = __get_cap_for_mds(inode, mds); if (!cap) { - if (!cap) { - if (new_cap) { - cap = new_cap; - new_cap = NULL; - } else { - spin_unlock(&inode->i_lock); - new_cap = kmalloc(sizeof(*cap), GFP_NOFS); - if (new_cap == NULL) - return -ENOMEM; - goto retry; + if (new_cap) { + cap = new_cap; + new_cap = NULL; + } else { + spin_unlock(&inode->i_lock); + new_cap = kmalloc(sizeof(*cap), GFP_NOFS); + if (new_cap == NULL) { + ceph_put_snap_realm(realm); + return -ENOMEM; } + goto retry; } cap->issued = cap->implemented = 0; @@ -139,7 +151,7 @@ retry: list_add(&cap->session_caps, &session->s_caps); session->s_nr_caps++; - /* clear out old exporting info? */ + /* clear out old exporting info? (i.e. on cap import) */ if (ci->i_cap_exporting_mds == mds) { ci->i_cap_exporting_issued = 0; ci->i_cap_exporting_mseq = 0; @@ -169,6 +181,11 @@ retry: return 0; } +/* + * Return set of valid cap bits issued to us. Note that caps time + * out, and may be invalidated in bulk if the client session times out + * and session->s_cap_gen is bumped. + */ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) { int have = ci->i_snap_caps; @@ -204,7 +221,7 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) * caller should hold i_lock, snap_rwsem, and session s_mutex. * returns true if this is the last cap. if so, caller should iput. */ -int __ceph_remove_cap(struct ceph_cap *cap) +static int __ceph_remove_cap(struct ceph_cap *cap) { struct ceph_mds_session *session = cap->session; struct ceph_inode_info *ci = cap->ci; @@ -223,6 +240,8 @@ int __ceph_remove_cap(struct ceph_cap *cap) if (RB_EMPTY_ROOT(&ci->i_caps)) { list_del_init(&ci->i_snap_realm_item); + ceph_put_snap_realm(ci->i_snap_realm); + ci->i_snap_realm = NULL; return 1; } return 0; @@ -244,10 +263,13 @@ void ceph_remove_cap(struct ceph_cap *cap) } /* - * caller holds i_lock - * -> client->cap_delay_lock + * + * (Re)queue cap at the end of the delayed cap release list. + * + * Caller holds i_lock + * -> we take mdsc->cap_delay_lock */ -static void __ceph_cap_delay_requeue(struct ceph_mds_client *mdsc, +void __cap_delay_requeue(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) { ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5); @@ -264,19 +286,37 @@ static void __ceph_cap_delay_requeue(struct ceph_mds_client *mdsc, spin_unlock(&mdsc->cap_delay_lock); } +/* + * Cancel delayed work on cap. + * caller hold s_mutex, snap_rwsem. + */ +static void __cap_delay_cancel(struct ceph_mds_client *mdsc, + struct ceph_inode_info *ci) +{ + dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode); + if (list_empty(&ci->i_cap_delay_list)) + return; + spin_lock(&mdsc->cap_delay_lock); + list_del_init(&ci->i_cap_delay_list); + spin_unlock(&mdsc->cap_delay_lock); + iput(&ci->vfs_inode); +} - - -static void send_cap(struct ceph_mds_client *mdsc, __u64 ino, int op, - int caps, int wanted, __u64 seq, __u64 mseq, - __u64 size, __u64 max_size, - struct timespec *mtime, struct timespec *atime, - u64 time_warp_seq, u64 follows, int mds) +/* + * Build and send a cap message to the given MDS. + * + * Caller should be holding s_mutex. + */ +static void send_cap_msg(struct ceph_mds_client *mdsc, __u64 ino, int op, + int caps, int wanted, __u64 seq, __u64 mseq, + __u64 size, __u64 max_size, + struct timespec *mtime, struct timespec *atime, + u64 time_warp_seq, u64 follows, int mds) { struct ceph_mds_caps *fc; struct ceph_msg *msg; - dout(10, "send_cap %s %llx caps %d wanted %d seq %llu/%llu" + dout(10, "send_cap_msg %s %llx caps %d wanted %d seq %llu/%llu" " follows %lld size %llu\n", ceph_cap_op_name(op), ino, caps, wanted, seq, mseq, follows, size); @@ -306,11 +346,94 @@ static void send_cap(struct ceph_mds_client *mdsc, __u64 ino, int op, ceph_send_msg_mds(mdsc, msg, mds); } +/* + * + * Send a cap msg on the given inode. Make note of max_size + * reported/requested from mds, revoked caps that have now been + * implemented. + * + * Also, try to invalidate page cache if we are dropping RDCACHE. + * Note that this will leave behind any locked pages... FIXME! + * + * called with i_lock, then drops it. + * caller should hold snap_rwsem, s_mutex. + */ +void __send_cap(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_cap *cap, + int used, int wanted) +{ + struct ceph_inode_info *ci = cap->ci; + struct inode *inode = &ci->vfs_inode; + int revoking = cap->implemented & ~cap->issued; + int dropping = cap->issued & ~wanted; + int keep; + u64 seq, mseq, time_warp_seq, follows; + u64 size, max_size; + struct timespec mtime, atime; + int wake = 0; + int op = CEPH_CAP_OP_ACK; + + if (wanted == 0) + op = CEPH_CAP_OP_RELEASE; + + dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session, + cap->issued, cap->issued & wanted); + cap->issued &= wanted; /* drop bits we don't want */ + + if (revoking && (revoking && used) == 0) { + cap->implemented = cap->issued; + /* + * Wake up any waiters on wanted -> needed transition. + * This is due to the weird transition from buffered + * to sync IO... we need to flush dirty pages _before_ + * allowing sync writes to avoid reordering. + */ + wake = 1; + } + + keep = cap->issued; + seq = cap->seq; + mseq = cap->mseq; + size = inode->i_size; + ci->i_reported_size = size; + max_size = ci->i_wanted_max_size; + ci->i_requested_max_size = max_size; + mtime = inode->i_mtime; + atime = inode->i_atime; + time_warp_seq = ci->i_time_warp_seq; + follows = ci->i_snap_realm->cached_context->seq; + spin_unlock(&inode->i_lock); + + if (dropping & CEPH_CAP_RDCACHE) { + /* invalidate what we can */ + dout(20, "invalidating pages on %p\n", inode); + invalidate_mapping_pages(&inode->i_data, 0, -1); + } + + send_cap_msg(mdsc, ceph_vino(inode).ino, + op, keep, wanted, seq, mseq, + size, max_size, &mtime, &atime, time_warp_seq, + follows, session->s_mds); + + if (wake) + wake_up(&ci->i_cap_wq); +} + + +/* + * When a snapshot is taken, clients accumulate "dirty" data on inodes + * with capabilities in ceph_cap_snaps to describe the file state at + * the time the snapshot was taken. This must be flushed + * asynchronously back to the MDS once sync writes complete and dirty + * data is written out. + * + * Called under i_lock. Takes s_mutex as needed. + */ void __ceph_flush_snaps(struct ceph_inode_info *ci) { struct inode *inode = &ci->vfs_inode; int mds; - u64 follows = 0; struct list_head *p; struct ceph_cap_snap *capsnap; int issued; @@ -319,14 +442,24 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci) u64 time_warp_seq; u32 mseq; struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc; - struct ceph_mds_session *session = NULL; + struct ceph_mds_session *session = NULL; /* if session != NULL, we hold + session->s_mutex */ + u64 follows = 0; /* keep track of how far we've gotten through the + i_cap_snaps list, and skip these entries next time + around to avoid an infinite loop */ dout(10, "__flush_snaps %p\n", inode); retry: list_for_each(p, &ci->i_cap_snaps) { capsnap = list_entry(p, struct ceph_cap_snap, ci_item); + + /* avoid an infiniute loop after retry */ if (capsnap->follows <= follows) continue; + /* + * we need to wait for sync writes to complete and for dirty + * pages to be written out. + */ if (capsnap->dirty || capsnap->writing) continue; @@ -348,6 +481,11 @@ retry: session); mutex_lock(&session->s_mutex); } + /* + * if session == NULL, we raced against a cap + * deletion. retry, and we'll get a better + * @mds value next time. + */ spin_lock(&inode->i_lock); goto retry; } @@ -363,12 +501,11 @@ retry: dout(10, "flush_snaps %p cap_snap %p follows %lld size %llu\n", inode, capsnap, follows, size); - - send_cap(mdsc, ceph_vino(inode).ino, - CEPH_CAP_OP_FLUSHSNAP, issued, 0, 0, mseq, - size, 0, - &mtime, &atime, time_warp_seq, - follows, mds); + send_cap_msg(mdsc, ceph_vino(inode).ino, + CEPH_CAP_OP_FLUSHSNAP, issued, 0, 0, mseq, + size, 0, + &mtime, &atime, time_warp_seq, + follows, mds); spin_lock(&inode->i_lock); goto retry; @@ -381,11 +518,12 @@ retry: } - /* - * examine currently used, wanted versus held caps. - * release, ack revoked caps to mds as appropriate. - * @is_delayed if caller just dropped a cap ref, and we probably want to delay + * Swiss army knife function to examine currently used, wanted versus + * held caps. Release, flush, ack revoked caps to mds as appropriate. + * + * @is_delayed indicates caller is delayed work and we should not + * delay further. */ void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed) { @@ -397,12 +535,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed) struct ceph_mds_session *session = NULL; /* if non-NULL, i hold s_mutex */ int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int revoking; - int mds = -1; + int mds = -1; /* keep track of how far we've gone through i_caps list + to avoid an infinite loop on retry */ struct rb_node *p; - spin_lock(&inode->i_lock); - /* flush snaps, first time around only */ + + /* flush snaps first time around only */ if (!list_empty(&ci->i_cap_snaps)) __ceph_flush_snaps(ci); goto first; @@ -415,29 +554,26 @@ first: inode, wanted, used, __ceph_caps_issued(ci, NULL)); if (!is_delayed) - __ceph_cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci); for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); + + /* avoid looping forever */ if (mds >= cap->mds) continue; - /* note: no side-effects allowed, until we take s_mutex */ + /* NOTE: no side-effects allowed, until we take s_mutex */ + revoking = cap->implemented & ~cap->issued; if (revoking) dout(10, "mds%d revoking %d\n", cap->mds, revoking); + /* request larger max_size from MDS? */ if (ci->i_wanted_max_size > ci->i_max_size && ci->i_wanted_max_size > ci->i_requested_max_size) goto ack; - /* completed revocation? */ - if (revoking && (revoking && used) == 0) { - dout(10, "completed revocation of %d\n", - cap->implemented & ~cap->issued); - goto ack; - } - /* approaching file_max? */ if ((cap->issued & CEPH_CAP_WR) && (inode->i_size << 1) >= ci->i_max_size && @@ -446,11 +582,18 @@ first: goto ack; } + /* completed revocation? */ + if (revoking && (revoking && used) == 0) { + dout(10, "completed revocation of %d\n", + cap->implemented & ~cap->issued); + goto ack; + } + if ((cap->issued & ~wanted) == 0) continue; /* nothing extra, all good */ + /* delay cap release for a bit? */ if (time_before(jiffies, ci->i_hold_caps_until)) { - /* delaying cap release for a bit */ dout(30, "delaying cap release\n"); continue; } @@ -468,7 +611,6 @@ ack: } took_snap_rwsem = 1; } - /* take s_mutex, one way or another */ if (session && session != cap->session) { dout(30, "oops, wrong session %p mutex\n", session); mutex_unlock(&session->s_mutex); @@ -487,9 +629,8 @@ ack: mds = cap->mds; /* remember mds, so we don't repeat */ - /* send_cap drops i_lock */ - __ceph_send_cap(mdsc, session, cap, - used, wanted, 0); + /* __send_cap drops i_lock */ + __send_cap(mdsc, session, cap, used, wanted); goto retry; /* retake i_lock and restart our cap scan. */ } @@ -505,9 +646,11 @@ ack: /* - * cap refs + * Track references to capabilities we hold, so that we don't release + * them to the MDS prematurely. + * + * Protected by i_lock. */ - static void __take_cap_refs(struct ceph_inode_info *ci, int got) { if (got & CEPH_CAP_RD) @@ -523,14 +666,13 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) } } -void ceph_take_cap_refs(struct ceph_inode_info *ci, int got) -{ - dout(30, "take_cap_refs on %p taking %d\n", &ci->vfs_inode, got); - spin_lock(&ci->vfs_inode.i_lock); - __take_cap_refs(ci, got); - spin_unlock(&ci->vfs_inode.i_lock); -} - +/* + * Try to grab cap references. Specify those refs we @want, and the + * minimal set we @need. Also include the larger offset we are writing + * to (when applicable), and check against max_size here as well. + * Note that caller is responsible for ensuring max_size increases are + * requested from the MDS. + */ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got, loff_t endoff) { @@ -546,6 +688,10 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got, inode, endoff, ci->i_max_size); goto sorry; } + /* + * If a sync write is in progress, we must wait, so that we + * can get a final snapshot value for size+mtime. + */ if (__ceph_have_pending_cap_snap(ci)) { dout(20, "get_cap_refs %p cap_snap_pending\n", inode); goto sorry; @@ -558,8 +704,8 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got, */ if ((have & need) == need) { /* - * look at (implemented & ~have & not) so that we keep waiting - * on transition from wanted -> needed caps. this is needed + * Look at (implemented & ~have & not) so that we keep waiting + * on transition from wanted -> needed caps. This is needed * for WRBUFFER|WR -> WR to avoid a new WR sync write from * going before a prior buffered writeback happens. */ @@ -582,6 +728,15 @@ sorry: return ret; } +/* + * Release cap refs. + * + * If we released the last ref on any given cap, call ceph_check_caps + * to release (or schedule a release). + * + * If we are releasing a WR cap (from a sync write), finalize any affected + * cap_snap, and wake up any waiters. + */ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) { struct inode *inode = &ci->vfs_inode; @@ -626,6 +781,13 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) wake_up(&ci->i_cap_wq); } +/* + * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap + * context. Adjust per-snap dirty page accounting as appropriate. + * Once all dirty data for a cap_snap is flushed, flush snapped file + * metadata back to the MDS. If we dropped the last ref, call + * ceph_check_caps. + */ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc) { @@ -677,10 +839,10 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, - - - /* + * Handle a cap GRANT message from the MDS. (Note that a GRANT may + * actually be a revocation if it specifies a smaller cap set.) + * * caller holds s_mutex. NOT snap_rwsem. * return value: * 0 - ok @@ -760,7 +922,7 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, cap->seq = seq; - /* layout may have changed */ + /* file layout may have changed */ ci->i_layout = grant->layout; /* revocation? */ @@ -771,9 +933,12 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, if ((used & ~newcaps) & CEPH_CAP_WRBUFFER) writeback = 1; /* will delay ack */ else { + /* + * we're not using revoked caps.. ack now. + * re-use incoming message. + */ cap->implemented = newcaps; - /* ack now. re-use incoming message. */ grant->size = le64_to_cpu(inode->i_size); grant->max_size = 0; /* don't re-request */ ceph_encode_timespec(&grant->mtime, &inode->i_mtime); @@ -796,8 +961,7 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, cap->issued = newcaps; cap->implemented |= newcaps; /* add bits only, to * avoid stepping on a - * pending - * revocation */ + * pending revocation */ wake = 1; } @@ -812,7 +976,7 @@ out: dout(10, "queueing %p for writeback\n", inode); ceph_queue_writeback(inode); } - if (invalidate) /* do what we can */ + if (invalidate) /* do what we can.. FIXME */ invalidate_mapping_pages(&inode->i_data, 0, -1); if (wake) wake_up(&ci->i_cap_wq); @@ -821,20 +985,9 @@ out: /* - * caller hold s_mutex, snap_rwsem. + * Handle RELEASE from MDS. That means we can throw away our cap + * state as the MDS has fully flushed that metadata to disk. */ -static void __cap_delay_cancel(struct ceph_mds_client *mdsc, - struct ceph_inode_info *ci) -{ - dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode); - if (list_empty(&ci->i_cap_delay_list)) - return; - spin_lock(&mdsc->cap_delay_lock); - list_del_init(&ci->i_cap_delay_list); - spin_unlock(&mdsc->cap_delay_lock); - iput(&ci->vfs_inode); -} - static void handle_cap_released(struct inode *inode, struct ceph_mds_caps *m, struct ceph_mds_session *session) @@ -860,7 +1013,10 @@ static void handle_cap_released(struct inode *inode, /* - * caller hold s_mutex, snap_rwsem. + * Handle FLUSHEDSNAP. MDS has flushed snap data to disk and we can + * throw away our cap_snap. + * + * Caller hold s_mutex, snap_rwsem. */ static void handle_cap_flushedsnap(struct inode *inode, struct ceph_mds_caps *m, @@ -878,7 +1034,7 @@ static void handle_cap_flushedsnap(struct inode *inode, list_for_each(p, &ci->i_cap_snaps) { capsnap = list_entry(p, struct ceph_cap_snap, ci_item); if (capsnap->follows == follows) { - WARN_ON(capsnap->dirty); + WARN_ON(capsnap->dirty || capsnap->writing); dout(10, " removing cap_snap %p follows %lld\n", capsnap, follows); ceph_put_snap_context(capsnap->context); @@ -892,6 +1048,8 @@ static void handle_cap_flushedsnap(struct inode *inode, /* + * Handle TRUNC from MDS, indicating file truncation. + * * caller hold s_mutex, NOT snap_rwsem. */ static void handle_cap_trunc(struct inode *inode, @@ -916,7 +1074,6 @@ static void handle_cap_trunc(struct inode *inode, * if its an expansion, and there is no truncate pending, we * don't need to truncate. */ - spin_lock(&inode->i_lock); if (ci->i_vmtruncate_to < 0 && size > inode->i_size) dout(10, "clean fwd truncate, no vmtruncate needed\n"); @@ -939,6 +1096,11 @@ static void handle_cap_trunc(struct inode *inode, } /* + * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a + * different one. If we are the most recent migration we've seen (as + * indicated by mseq), make note of the migrating cap bits for the + * duration (until we see the corresponding IMPORT). + * * caller holds s_mutex, snap_rwsem */ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, @@ -950,6 +1112,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, struct ceph_cap *cap = NULL, *t; struct rb_node *p; int was_last = 0; + int remember = 1; dout(10, "handle_cap_export inode %p ci %p mds%d mseq %d\n", inode, ci, mds, mseq); @@ -962,28 +1125,32 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, if (t->mseq > mseq) { dout(10, " higher mseq on cap from mds%d\n", t->session->s_mds); - goto out; + remember = 0; } if (t->session->s_mds == mds) cap = t; } if (cap) { - /* make note, and remove */ - ci->i_cap_exporting_mds = mds; - ci->i_cap_exporting_mseq = mseq; - ci->i_cap_exporting_issued = cap->issued; + if (remember) { + /* make note */ + ci->i_cap_exporting_mds = mds; + ci->i_cap_exporting_mseq = mseq; + ci->i_cap_exporting_issued = cap->issued; + } was_last = __ceph_remove_cap(cap); } else WARN_ON(!cap); -out: spin_unlock(&inode->i_lock); if (was_last) iput(inode); } /* + * Handle cap IMPORT. If there are temp bits from an older EXPORT, + * clean them up. + * * caller holds s_mutex, snap_rwsem */ static void handle_cap_import(struct inode *inode, struct ceph_mds_caps *im, @@ -1015,7 +1182,13 @@ static void handle_cap_import(struct inode *inode, struct ceph_mds_caps *im, } - +/* + * Handle a CEPH_CAPS message from the MDS. + * + * Identify the appropriate session, inode, and call the right handler + * based on the cap op. Take read or write lock on snap_rwsem as + * appropriate. + */ void ceph_handle_caps(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { @@ -1064,8 +1237,8 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc, inode); if (!inode) { dout(10, " i don't have ino %llx, sending release\n", vino.ino); - send_cap(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq, - size, 0, 0, NULL, NULL, 0, 0, mds); + send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq, + size, 0, 0, NULL, NULL, 0, 0, mds); goto no_inode; } @@ -1127,73 +1300,10 @@ bad: return; } + /* - * called with i_lock, then drops it. - * caller should hold snap_rwsem, s_mutex. - * - * returns true if we removed the last cap on this inode. + * Delayed work handler to process end of delayed cap release LRU list. */ -int __ceph_send_cap(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - struct ceph_cap *cap, - int used, int wanted, - int flush_snap) __releases(cap->ci->vfs_inode->i_lock) -{ - struct ceph_inode_info *ci = cap->ci; - struct inode *inode = &ci->vfs_inode; - int revoking = cap->implemented & ~cap->issued; - int dropping = cap->issued & ~wanted; - int keep; - u64 seq, mseq, time_warp_seq, follows; - u64 size, max_size; - struct timespec mtime, atime; - int wake = 0; - int op = CEPH_CAP_OP_ACK; - - if (flush_snap) - op = CEPH_CAP_OP_FLUSHSNAP; - else if (wanted == 0) - op = CEPH_CAP_OP_RELEASE; - - dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session, - cap->issued, cap->issued & wanted); - cap->issued &= wanted; /* drop bits we don't want */ - - if (revoking && (revoking && used) == 0) { - cap->implemented = cap->issued; - wake = 1; /* for waiters on wanted -> needed transition */ - } - - keep = cap->issued; - seq = cap->seq; - mseq = cap->mseq; - size = inode->i_size; - ci->i_reported_size = size; - max_size = ci->i_wanted_max_size; - ci->i_requested_max_size = max_size; - mtime = inode->i_mtime; - atime = inode->i_atime; - time_warp_seq = ci->i_time_warp_seq; - follows = ci->i_snap_realm->cached_context->seq; - spin_unlock(&inode->i_lock); - - if (dropping & CEPH_CAP_RDCACHE) { - /* invalidate what we can */ - dout(20, "invalidating pages on %p\n", inode); - invalidate_mapping_pages(&inode->i_data, 0, -1); - } - - send_cap(mdsc, ceph_vino(inode).ino, - op, keep, wanted, seq, mseq, - size, max_size, &mtime, &atime, time_warp_seq, - follows, session->s_mds); - - if (wake) - wake_up(&ci->i_cap_wq); - - return 0; -} - void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) { struct ceph_inode_info *ci; @@ -1217,6 +1327,10 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) spin_unlock(&mdsc->cap_delay_lock); } + +/* + * Force a flush of any write caps we hold. + */ void ceph_flush_write_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int purge) @@ -1239,13 +1353,14 @@ void ceph_flush_write_caps(struct ceph_mds_client *mdsc, wanted = __ceph_caps_wanted(cap->ci); if (purge && (used || wanted)) { - derr(0, "residual caps on %p used %d wanted %d s=%llu wrb=%d\n", + derr(0, "residual caps on %p u %d w %d s=%llu wrb=%d\n", inode, used, wanted, inode->i_size, cap->ci->i_wrbuffer_ref); used = wanted = 0; } - __ceph_send_cap(mdsc, session, cap, used, wanted, 0); + /* __send_cap drops i_lock */ + __send_cap(mdsc, session, cap, used, wanted); } } diff --git a/src/kernel/super.h b/src/kernel/super.h index f0a288273776a..6b49ec4b816b9 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -650,14 +650,10 @@ extern int ceph_add_cap(struct inode *inode, unsigned cap, unsigned seq, void *snapblob, int snapblob_len, struct ceph_cap *new_cap); -extern int __ceph_remove_cap(struct ceph_cap *cap); extern void ceph_remove_cap(struct ceph_cap *cap); -extern void ceph_remove_all_caps(struct ceph_inode_info *ci); -extern int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq); extern int ceph_get_cap_mds(struct inode *inode); extern int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got, loff_t offset); -extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got); extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had); extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc); @@ -667,11 +663,6 @@ extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc); extern void ceph_flush_write_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int purge); -extern int __ceph_send_cap(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - struct ceph_cap *cap, - int used, int wanted, - int flush_snap); /* addr.c */ extern const struct address_space_operations ceph_aops; -- 2.39.5