BUG_ON(!ci->i_snap_realm->cached_context);
if (page->private &&
(void *)page->private != ci->i_snap_realm->cached_context) {
- /* this page is already dirty in another (older) snap
- * context! is it writeable now? */
+ /*
+ * this page is already dirty in another (older) snap
+ * context! is it writeable now?
+ */
snapc = get_oldest_context(inode);
up_read(&mdsc->snap_rwsem);
if (snapc != (void *)page->private) {
dout(10, " page %p snapc %p not current or oldest\n",
page, (void *)page->private);
- /* queue for writeback, and wait for snapc
- * to be writeable or written */
+ /*
+ * queue for writeback, and wait for snapc to
+ * be writeable or written
+ */
snapc = ceph_get_snap_context((void *)page->private);
unlock_page(page);
if (ceph_queue_writeback(inode))
*
* Bump i_count when adding it's first cap.
*
- * Caller should hold session snap_rwsem, s_mutex.
+ * Caller should hold session snap_rwsem (read), s_mutex.
*
* @fmode can be negative, in which case it is ignored.
*/
}
/*
- * caller should hold i_lock, snap_rwsem, and session s_mutex.
+ * called under i_lock
+ */
+static int __ceph_is_any_caps(struct ceph_inode_info *ci)
+{
+ return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0;
+}
+
+/*
+ * caller should hold i_lock, and session s_mutex.
* returns true if this is the last cap. if so, caller should iput.
*/
static int __ceph_remove_cap(struct ceph_cap *cap)
kfree(cap);
- if (RB_EMPTY_ROOT(&ci->i_caps)) {
+ if (!__ceph_is_any_caps(ci)) {
list_del_init(&ci->i_snap_realm_item);
ceph_put_snap_realm(mdsc, ci->i_snap_realm);
ci->i_snap_realm = NULL;
}
/*
- * caller should hold snap_rwsem and session s_mutex.
+ * caller should hold session s_mutex.
*/
void ceph_remove_cap(struct ceph_cap *cap)
{
/*
* Cancel delayed work on cap.
- * caller hold s_mutex, snap_rwsem.
+ * caller hold s_mutex
*/
static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
* Note that this will leave behind any locked pages... FIXME!
*
* called with i_lock, then drops it.
- * caller should hold snap_rwsem, s_mutex.
+ * caller should hold snap_rwsem (read), s_mutex.
*/
static void __send_cap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
}
ack:
- /* take snap_rwsem before session mutex */
- if (!took_snap_rwsem) {
- if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
- dout(10, "inverting snap/in locks on %p\n",
- inode);
- spin_unlock(&inode->i_lock);
- down_read(&mdsc->snap_rwsem);
- took_snap_rwsem = 1;
- goto retry;
- }
- took_snap_rwsem = 1;
- }
if (session && session != cap->session) {
dout(30, "oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
dout(10, "inverting session/ino locks on %p\n",
session);
spin_unlock(&inode->i_lock);
+ if (took_snap_rwsem) {
+ up_read(&mdsc->snap_rwsem);
+ took_snap_rwsem = 0;
+ }
mutex_lock(&session->s_mutex);
goto retry;
}
}
+ /* take snap_rwsem after session mutex */
+ if (!took_snap_rwsem) {
+ if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
+ dout(10, "inverting snap/in locks on %p\n",
+ inode);
+ spin_unlock(&inode->i_lock);
+ down_read(&mdsc->snap_rwsem);
+ took_snap_rwsem = 1;
+ goto retry;
+ }
+ took_snap_rwsem = 1;
+ }
mds = cap->mds; /* remember mds, so we don't repeat */
* Handle a cap GRANT message from the MDS. (Note that a GRANT may
* actually be a revocation if it specifies a smaller cap set.)
*
- * caller holds s_mutex. NOT snap_rwsem.
+ * caller holds s_mutex.
* return value:
* 0 - ok
* 1 - send the msg back to mds
* Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can
* throw away our cap_snap.
*
- * Caller hold s_mutex, snap_rwsem.
+ * Caller hold s_mutex.
*/
static void handle_cap_flushsnap_ack(struct inode *inode,
struct ceph_mds_caps *m,
/*
* Handle TRUNC from MDS, indicating file truncation.
*
- * caller hold s_mutex, NOT snap_rwsem.
+ * caller hold s_mutex.
*/
static void handle_cap_trunc(struct inode *inode,
struct ceph_mds_caps *trunc,
* indicated by mseq), make note of the migrating cap bits for the
* duration (until we see the corresponding IMPORT).
*
- * caller holds s_mutex, snap_rwsem
+ * caller holds s_mutex
*/
static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
struct ceph_mds_session *session)
* Handle cap IMPORT. If there are temp bits from an older EXPORT,
* clean them up.
*
- * caller holds s_mutex, snap_rwsem
+ * caller holds s_mutex.
*/
static void handle_cap_import(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *im,
unsigned seq = le32_to_cpu(im->seq);
unsigned mseq = le32_to_cpu(im->migrate_seq);
u64 realmino = le64_to_cpu(im->realm);
- struct ceph_snap_realm *realm;
unsigned long ttl_ms = le32_to_cpu(im->ttl_ms);
if (ci->i_cap_exporting_mds >= 0 &&
inode, ci, mds, mseq);
}
- realm = ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
- false);
+ down_write(&mdsc->snap_rwsem);
+ ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
+ false);
+ downgrade_write(&mdsc->snap_rwsem);
ceph_add_cap(inode, session, -1, issued, wanted, seq, mseq, realmino,
ttl_ms, jiffies - ttl_ms/2, NULL);
- ceph_put_snap_realm(mdsc, realm);
+ up_read(&mdsc->snap_rwsem);
}
* Handle a CEPH_CAPS message from the MDS.
*
* Identify the appropriate session, inode, and call the right handler
- * based on the cap op. Take read or write lock on snap_rwsem as
- * appropriate.
+ * based on the cap op.
*/
void ceph_handle_caps(struct ceph_mds_client *mdsc,
struct ceph_msg *msg)
/* find session */
mutex_lock(&mdsc->mutex);
session = __ceph_get_mds_session(mdsc, mds);
- if (session)
- down_write(&mdsc->snap_rwsem);
mutex_unlock(&mdsc->mutex);
if (!session) {
dout(10, "WTF, got cap but no session for mds%d\n", mds);
switch (op) {
case CEPH_CAP_OP_FLUSHSNAP_ACK:
handle_cap_flushsnap_ack(inode, h, session);
- up_write(&mdsc->snap_rwsem);
goto done;
case CEPH_CAP_OP_EXPORT:
handle_cap_export(inode, h, session);
- up_write(&mdsc->snap_rwsem);
if (list_empty(&session->s_caps))
ceph_mdsc_flushed_all_caps(mdsc, session);
goto done;
handle_cap_import(mdsc, inode, h, session,
msg->front.iov_base + sizeof(*h),
le32_to_cpu(h->snap_trace_len));
- up_write(&mdsc->snap_rwsem);
check_caps = 1; /* we may have sent a RELEASE to the old auth */
goto done;
}
/* note that each of these drops i_lock for us */
switch (op) {
case CEPH_CAP_OP_GRANT:
- up_write(&mdsc->snap_rwsem);
r = handle_cap_grant(inode, h, session, cap,&xattr_data);
if (r == 1) {
dout(10, " sending reply back to mds%d\n", mds);
case CEPH_CAP_OP_FLUSH_ACK:
handle_cap_flush_ack(inode, h, session, cap);
- up_write(&mdsc->snap_rwsem);
if (list_empty(&session->s_caps))
ceph_mdsc_flushed_all_caps(mdsc, session);
break;
case CEPH_CAP_OP_TRUNC:
- up_write(&mdsc->snap_rwsem);
handle_cap_trunc(inode, h, session);
break;
default:
spin_unlock(&inode->i_lock);
- up_write(&mdsc->snap_rwsem);
derr(10, " unknown cap op %d %s\n", op, ceph_cap_op_name(op));
}
return;
release:
- up_write(&mdsc->snap_rwsem);
send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE,
0, 0, 0,
seq, 0,
*
* FIXME: we should check inode.version to avoid races between traces
* from multiple MDSs after, say, a ancestor directory is renamed.
+ *
+ * Called with snap_rwsem (read).
*/
int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
struct ceph_mds_session *session)
u64 tid;
int err, result;
int mds;
- struct ceph_snap_realm *realm = NULL;
if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
return;
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
}
- /* take the snap sem -- we may be are adding a cap here */
- down_write(&mdsc->snap_rwsem);
mutex_unlock(&mdsc->mutex);
mutex_lock(&req->r_session->s_mutex);
dout(10, "handle_reply tid %lld result %d\n", tid, result);
/* snap trace */
- if (rinfo->snapblob_len)
- realm = ceph_update_snap_trace(mdsc, rinfo->snapblob,
+ if (rinfo->snapblob_len) {
+ down_write(&mdsc->snap_rwsem);
+ ceph_update_snap_trace(mdsc, rinfo->snapblob,
rinfo->snapblob + rinfo->snapblob_len,
le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
+ downgrade_write(&mdsc->snap_rwsem);
+ } else {
+ down_read(&mdsc->snap_rwsem);
+ }
/* insert trace into our cache */
err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
done:
- if (realm)
- ceph_put_snap_realm(mdsc, realm);
- up_write(&mdsc->snap_rwsem);
+ up_read(&mdsc->snap_rwsem);
if (err) {
req->r_err = err;
/* find session */
session = __ceph_get_mds_session(mdsc, mds);
- down_read(&mdsc->snap_rwsem);
mutex_unlock(&mdsc->mutex); /* drop lock for duration */
if (session) {
mds);
}
+ down_read(&mdsc->snap_rwsem);
+
retry:
/* build reply */
reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
}
out:
+ up_read(&mdsc->snap_rwsem);
if (session) {
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
}
- up_read(&mdsc->snap_rwsem);
mutex_lock(&mdsc->mutex);
return;
mdsc->stopping = 0;
init_rwsem(&mdsc->snap_rwsem);
INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
+ INIT_LIST_HEAD(&mdsc->snap_empty);
+ spin_lock_init(&mdsc->snap_empty_lock);
mdsc->last_tid = 0;
INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
mutex_unlock(&mdsc->mutex);
+ ceph_cleanup_empty_realms(mdsc);
+
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
dout(10, "stopped\n");
*
* session->s_mutex
* mdsc->mutex
- * mdsc->snap_rwsem
+ *
+ * mdsc->snap_rwsem
*
* inode->i_lock
* mdsc->snap_flush_lock
* mdsc->cap_delay_lock
*
+ *
*/
struct ceph_client;
int stopping; /* true if shutting down */
/*
- * snap_rwsem will cover cap linkage into snaprealms, and realm
- * snap contexts. (later, we can do per-realm snap contexts locks..)
+ * snap_rwsem will cover cap linkage into snaprealms, and
+ * realm snap contexts. (later, we can do per-realm snap
+ * contexts locks..) the empty list contains realms with no
+ * references (implying they contain no inodes with caps) that
+ * should be destroyed.
*/
struct rw_semaphore snap_rwsem;
struct radix_tree_root snap_realms;
+ struct list_head snap_empty;
+ spinlock_t snap_empty_lock; /* protect snap_empty */
u64 last_tid; /* most recent mds request */
struct radix_tree_root request_tree; /* pending mds requests */
* realm, which simply lists the resulting set of snaps for the realm. This
* is attached to any writes sent to OSDs.
*/
+/*
+ * Unfortunately error handling is a bit mixed here. If we get a snap
+ * update, but don't have enough memory to update our realm hierarchy,
+ * it's not clear what we can do about it (besides complaining to the
+ * console).
+ */
/*
*
* caller must hold snap_rwsem for write.
*/
-static void get_realm(struct ceph_snap_realm *realm)
+static void get_realm(struct ceph_mds_client *mdsc,
+ struct ceph_snap_realm *realm)
{
- realm->nref++;
-}
+ /*
+ * since we _only_ increment realm refs or empty the empty
+ * list with snap_rwsem held, adjusting the empty list here is
+ * safe. we do need to protect against concurrent empty list
+ * additions, however.
+ */
+ if (atomic_read(&realm->nref) == 0) {
+ spin_lock(&mdsc->snap_empty_lock);
+ list_del_init(&realm->empty_item);
+ spin_unlock(&mdsc->snap_empty_lock);
+ }
-/*
- * Unfortunately error handling is a bit mixed here. If we get a snap
- * update, but don't have enough memory to update our realm hierarchy,
- * it's not clear what we can do about it (besides complaining to the
- * console).
- */
+ atomic_inc(&realm->nref);
+}
/*
* create and get the realm rooted at @ino and bump its ref count.
* caller must hold snap_rwsem for write.
*/
struct ceph_snap_realm *ceph_create_snap_realm(struct ceph_mds_client *mdsc,
- u64 ino)
+ u64 ino)
{
struct ceph_snap_realm *realm;
realm = kzalloc(sizeof(*realm), GFP_NOFS);
if (!realm)
return ERR_PTR(-ENOMEM);
+
radix_tree_insert(&mdsc->snap_realms, ino, realm);
- realm->nref = 0; /* tree does not take a ref */
+
+ atomic_set(&realm->nref, 0); /* tree does not take a ref */
realm->ino = ino;
INIT_LIST_HEAD(&realm->children);
INIT_LIST_HEAD(&realm->child_item);
+ INIT_LIST_HEAD(&realm->empty_item);
INIT_LIST_HEAD(&realm->inodes_with_caps);
dout(20, "create_snap_realm %llx %p\n", realm->ino, realm);
return realm;
realm = radix_tree_lookup(&mdsc->snap_realms, ino);
if (realm) {
dout(20, "get_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
- realm->nref, realm->nref+1);
- get_realm(realm);
+ atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
+ get_realm(mdsc, realm);
}
return realm;
}
+static void __put_snap_realm(struct ceph_mds_client *mdsc,
+ struct ceph_snap_realm *realm);
+
+/*
+ * called with snap_rwsem (write)
+ */
+static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
+ struct ceph_snap_realm *realm)
+{
+ dout(10, "__destroy_snap_realm %p %llx\n", realm, realm->ino);
+
+ radix_tree_delete(&mdsc->snap_realms, realm->ino);
+
+ if (realm->parent) {
+ list_del_init(&realm->child_item);
+ __put_snap_realm(mdsc, realm->parent);
+ }
+
+ kfree(realm->prior_parent_snaps);
+ kfree(realm->snaps);
+ ceph_put_snap_context(realm->cached_context);
+ kfree(realm);
+}
+
+/*
+ * caller holds snap_rwsem (write)
+ */
+static void __put_snap_realm(struct ceph_mds_client *mdsc,
+ struct ceph_snap_realm *realm)
+{
+ dout(20, "__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
+ atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
+ if (atomic_dec_and_test(&realm->nref))
+ __destroy_snap_realm(mdsc, realm);
+}
+
/*
- * caller must hold snap_rwsem for write
+ * caller needn't hold any locks
*/
void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm)
{
dout(20, "put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
- realm->nref, realm->nref-1);
- realm->nref--;
- if (realm->nref == 0) {
- if (realm->parent) {
- list_del_init(&realm->child_item);
- ceph_put_snap_realm(mdsc, realm->parent);
- }
- radix_tree_delete(&mdsc->snap_realms, realm->ino);
- kfree(realm->prior_parent_snaps);
- kfree(realm->snaps);
- ceph_put_snap_context(realm->cached_context);
- kfree(realm);
+ atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
+ if (!atomic_dec_and_test(&realm->nref))
+ return;
+
+ if (down_write_trylock(&mdsc->snap_rwsem)) {
+ __destroy_snap_realm(mdsc, realm);
+ up_write(&mdsc->snap_rwsem);
+ } else {
+ spin_lock(&mdsc->snap_empty_lock);
+ list_add(&mdsc->snap_empty, &realm->empty_item);
+ spin_unlock(&mdsc->snap_empty_lock);
+ }
+}
+
+/*
+ * Clean up any realms whose ref counts have dropped to zero. Note
+ * that this does not include realms who were created but not yet
+ * used.
+ *
+ * Called under snap_rwsem (write)
+ */
+static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
+{
+ struct ceph_snap_realm *realm;
+
+ spin_lock(&mdsc->snap_empty_lock);
+ while (!list_empty(&mdsc->snap_empty)) {
+ realm = list_entry(&mdsc->snap_empty, struct ceph_snap_realm,
+ empty_item);
+ list_del(&realm->empty_item);
+ spin_unlock(&mdsc->snap_empty_lock);
+ __destroy_snap_realm(mdsc, realm);
+ spin_lock(&mdsc->snap_empty_lock);
}
+ spin_unlock(&mdsc->snap_empty_lock);
+}
+
+void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc)
+{
+ down_write(&mdsc->snap_rwsem);
+ __cleanup_empty_realms(mdsc);
+ up_write(&mdsc->snap_rwsem);
}
/*
}
realm->parent_ino = parentino;
realm->parent = parent;
- get_realm(parent);
+ get_realm(mdsc, parent);
list_add(&realm->child_item, &parent->children);
return 1;
}
*
* Caller must hold snap_rwsem for write.
*/
-struct ceph_snap_realm *ceph_update_snap_trace(struct ceph_mds_client *mdsc,
- void *p, void *e, bool deletion)
+int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
+ void *p, void *e, bool deletion)
{
struct ceph_mds_snap_realm *ri; /* encoded */
__le64 *snaps; /* encoded */
__le64 *prior_parent_snaps; /* encoded */
- struct ceph_snap_realm *realm, *first = NULL;
+ struct ceph_snap_realm *realm;
int invalidate = 0;
int err = -ENOMEM;
goto fail;
}
}
- if (!first) {
- /* take note if this is the first realm in the trace
- * (the most deeply nested)... we will return if (with
- * nref bumped) to the caller. */
- first = realm;
- get_realm(realm);
- }
if (le64_to_cpu(ri->seq) > realm->seq) {
dout(10, "update_snap_trace updating %llx %p %lld -> %lld\n",
if (p < e)
goto more;
- return first;
+ __cleanup_empty_realms(mdsc);
+
+ return 0;
bad:
err = -EINVAL;
fail:
derr(10, "update_snap_trace error %d\n", err);
- return ERR_PTR(err);
+ return err;
}
/* find session */
mutex_lock(&mdsc->mutex);
session = __ceph_get_mds_session(mdsc, mds);
- if (session)
- down_write(&mdsc->snap_rwsem);
mutex_unlock(&mdsc->mutex);
if (!session) {
dout(10, "WTF, got snap but no session for mds%d\n", mds);
return;
}
- locked_rwsem = 1;
mutex_lock(&session->s_mutex);
session->s_seq++;
mutex_unlock(&session->s_mutex);
+ down_write(&mdsc->snap_rwsem);
+ locked_rwsem = 1;
+
if (op == CEPH_SNAP_OP_SPLIT) {
struct ceph_mds_snap_realm *ri;
realm = ceph_create_snap_realm(mdsc, split);
if (IS_ERR(realm))
goto out;
- get_realm(realm);
+ get_realm(mdsc, realm);
}
dout(10, "splitting snap_realm %llx %p\n", realm->ino, realm);
* update using the provided snap trace. if we are deleting a
* snap, we can avoid queueing cap_snaps.
*/
- realm = ceph_update_snap_trace(mdsc, p, e,
- op == CEPH_SNAP_OP_DESTROY);
- if (IS_ERR(realm))
- goto bad;
+ ceph_update_snap_trace(mdsc, p, e,
+ op == CEPH_SNAP_OP_DESTROY);
if (op == CEPH_SNAP_OP_SPLIT) {
/*
list_add(&ci->i_snap_realm_item,
&realm->inodes_with_caps);
ci->i_snap_realm = realm;
- get_realm(realm);
+ get_realm(mdsc, realm);
split_skip_inode:
spin_unlock(&inode->i_lock);
iput(inode);
ceph_put_snap_realm(mdsc, realm);
}
- ceph_put_snap_realm(mdsc, realm);
+ __cleanup_empty_realms(mdsc);
+
up_write(&mdsc->snap_rwsem);
flush_snaps(mdsc);
*/
struct ceph_snap_realm {
u64 ino;
- int nref;
+ atomic_t nref;
u64 created, seq;
u64 parent_ino;
u64 parent_since; /* snapid when our current parent became so */
struct list_head children; /* list of child realms */
struct list_head child_item;
+ struct list_head empty_item; /* if i have ref==0 */
+
/* the current set of snaps for this realm */
struct ceph_snap_context *cached_context;
u64 ino);
extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm);
-extern struct ceph_snap_realm *ceph_update_snap_trace(struct ceph_mds_client *m,
- void *p, void *e,
- bool deletion);
+extern int ceph_update_snap_trace(struct ceph_mds_client *m,
+ void *p, void *e, bool deletion);
extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern void ceph_queue_cap_snap(struct ceph_inode_info *ci,
struct ceph_snap_context *snapc);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap);
+extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
/*
* a cap_snap is "pending" if it is still awaiting an in-progress