]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: improve snap locking scheme
authorSage Weil <sage@newdream.net>
Wed, 21 Jan 2009 00:35:43 +0000 (16:35 -0800)
committerSage Weil <sage@newdream.net>
Wed, 21 Jan 2009 00:35:43 +0000 (16:35 -0800)
Because snap_rwsem is taken for read on every write, we want to
minimize contention.  We:

- allow cap removal to occur without snap_rwsem (read or write)
- allow fill_trace and add_cap with snap_rwsem (read)
- require write really only for update_snap_trace and on snap
  updates from the mds.

Dropping realm references without snap_rwsem gets a bit tricky.  We
keep an empty (nref==0) realm list and purge it periodically to
avoid contention in the general case.

src/kernel/addr.c
src/kernel/caps.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/snap.c
src/kernel/super.h

index 0bf78bdc5c84213c4a773d5f4e46083129df8547..5d3cdb56c9fa3c09f289675db5dc2bfc60ce9a6c 100644 (file)
@@ -847,16 +847,20 @@ retry_locked:
        BUG_ON(!ci->i_snap_realm->cached_context);
        if (page->private &&
            (void *)page->private != ci->i_snap_realm->cached_context) {
-               /* this page is already dirty in another (older) snap
-                * context!  is it writeable now? */
+               /*
+                * this page is already dirty in another (older) snap
+                * context!  is it writeable now?
+                */
                snapc = get_oldest_context(inode);
                up_read(&mdsc->snap_rwsem);
 
                if (snapc != (void *)page->private) {
                        dout(10, " page %p snapc %p not current or oldest\n",
                             page, (void *)page->private);
-                       /* queue for writeback, and wait for snapc
-                        * to be writeable or written */
+                       /*
+                        * queue for writeback, and wait for snapc to
+                        * be writeable or written
+                        */
                        snapc = ceph_get_snap_context((void *)page->private);
                        unlock_page(page);
                        if (ceph_queue_writeback(inode))
index 747600670f2037a45291f606f485091c7adae42a..db02a29450a2903fe0e65aa94c628d2643376ea3 100644 (file)
@@ -171,7 +171,7 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
  *
  * Bump i_count when adding it's first cap.
  *
- * Caller should hold session snap_rwsem, s_mutex.
+ * Caller should hold session snap_rwsem (read), s_mutex.
  *
  * @fmode can be negative, in which case it is ignored.
  */
@@ -345,7 +345,15 @@ int __ceph_caps_dirty(struct ceph_inode_info *ci)
 }
 
 /*
- * caller should hold i_lock, snap_rwsem, and session s_mutex.
+ * called under i_lock
+ */
+static int __ceph_is_any_caps(struct ceph_inode_info *ci)
+{
+       return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0;
+}
+
+/*
+ * caller should hold i_lock, and session s_mutex.
  * returns true if this is the last cap.  if so, caller should iput.
  */
 static int __ceph_remove_cap(struct ceph_cap *cap)
@@ -368,7 +376,7 @@ static int __ceph_remove_cap(struct ceph_cap *cap)
 
        kfree(cap);
 
-       if (RB_EMPTY_ROOT(&ci->i_caps)) {
+       if (!__ceph_is_any_caps(ci)) {
                list_del_init(&ci->i_snap_realm_item);
                ceph_put_snap_realm(mdsc, ci->i_snap_realm);
                ci->i_snap_realm = NULL;
@@ -378,7 +386,7 @@ static int __ceph_remove_cap(struct ceph_cap *cap)
 }
 
 /*
- * caller should hold snap_rwsem and session s_mutex.
+ * caller should hold session s_mutex.
  */
 void ceph_remove_cap(struct ceph_cap *cap)
 {
@@ -418,7 +426,7 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
 
 /*
  * Cancel delayed work on cap.
- * caller hold s_mutex, snap_rwsem.
+ * caller hold s_mutex
  */
 static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
                               struct ceph_inode_info *ci)
@@ -496,7 +504,7 @@ static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
  * Note that this will leave behind any locked pages... FIXME!
  *
  * called with i_lock, then drops it.
- * caller should hold snap_rwsem, s_mutex.
+ * caller should hold snap_rwsem (read), s_mutex.
  */
 static void __send_cap(struct ceph_mds_client *mdsc,
                       struct ceph_mds_session *session,
@@ -858,18 +866,6 @@ retry_locked:
                }
 
 ack:
-               /* take snap_rwsem before session mutex */
-               if (!took_snap_rwsem) {
-                       if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
-                               dout(10, "inverting snap/in locks on %p\n",
-                                    inode);
-                               spin_unlock(&inode->i_lock);
-                               down_read(&mdsc->snap_rwsem);
-                               took_snap_rwsem = 1;
-                               goto retry;
-                       }
-                       took_snap_rwsem = 1;
-               }
                if (session && session != cap->session) {
                        dout(30, "oops, wrong session %p mutex\n", session);
                        mutex_unlock(&session->s_mutex);
@@ -881,10 +877,26 @@ ack:
                                dout(10, "inverting session/ino locks on %p\n",
                                     session);
                                spin_unlock(&inode->i_lock);
+                               if (took_snap_rwsem) {
+                                       up_read(&mdsc->snap_rwsem);
+                                       took_snap_rwsem = 0;
+                               }
                                mutex_lock(&session->s_mutex);
                                goto retry;
                        }
                }
+               /* take snap_rwsem after session mutex */
+               if (!took_snap_rwsem) {
+                       if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
+                               dout(10, "inverting snap/in locks on %p\n",
+                                    inode);
+                               spin_unlock(&inode->i_lock);
+                               down_read(&mdsc->snap_rwsem);
+                               took_snap_rwsem = 1;
+                               goto retry;
+                       }
+                       took_snap_rwsem = 1;
+               }
 
                mds = cap->mds;  /* remember mds, so we don't repeat */
 
@@ -1114,7 +1126,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
  *
- * caller holds s_mutex.  NOT snap_rwsem.
+ * caller holds s_mutex.
  * return value:
  *  0 - ok
  *  1 - send the msg back to mds
@@ -1387,7 +1399,7 @@ static void handle_cap_flush_ack(struct inode *inode,
  * Handle FLUSHSNAP_ACK.  MDS has flushed snap data to disk and we can
  * throw away our cap_snap.
  *
- * Caller hold s_mutex, snap_rwsem.
+ * Caller hold s_mutex.
  */
 static void handle_cap_flushsnap_ack(struct inode *inode,
                                     struct ceph_mds_caps *m,
@@ -1428,7 +1440,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode,
 /*
  * Handle TRUNC from MDS, indicating file truncation.
  *
- * caller hold s_mutex, NOT snap_rwsem.
+ * caller hold s_mutex.
  */
 static void handle_cap_trunc(struct inode *inode,
                             struct ceph_mds_caps *trunc,
@@ -1481,7 +1493,7 @@ static void handle_cap_trunc(struct inode *inode,
  * indicated by mseq), make note of the migrating cap bits for the
  * duration (until we see the corresponding IMPORT).
  *
- * caller holds s_mutex, snap_rwsem
+ * caller holds s_mutex
  */
 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                              struct ceph_mds_session *session)
@@ -1532,7 +1544,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
  * Handle cap IMPORT.  If there are temp bits from an older EXPORT,
  * clean them up.
  *
- * caller holds s_mutex, snap_rwsem
+ * caller holds s_mutex.
  */
 static void handle_cap_import(struct ceph_mds_client *mdsc, 
                              struct inode *inode, struct ceph_mds_caps *im,
@@ -1546,7 +1558,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
        unsigned seq = le32_to_cpu(im->seq);
        unsigned mseq = le32_to_cpu(im->migrate_seq);
        u64 realmino = le64_to_cpu(im->realm);
-       struct ceph_snap_realm *realm;
        unsigned long ttl_ms = le32_to_cpu(im->ttl_ms);
 
        if (ci->i_cap_exporting_mds >= 0 &&
@@ -1563,11 +1574,13 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
                     inode, ci, mds, mseq);
        }
        
-       realm = ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
-                                      false);
+       down_write(&mdsc->snap_rwsem);
+       ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
+                              false);
+       downgrade_write(&mdsc->snap_rwsem);
        ceph_add_cap(inode, session, -1, issued, wanted, seq, mseq, realmino,
                     ttl_ms, jiffies - ttl_ms/2, NULL);
-       ceph_put_snap_realm(mdsc, realm);
+       up_read(&mdsc->snap_rwsem);
 }
 
 
@@ -1575,8 +1588,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
  * Handle a CEPH_CAPS message from the MDS.
  *
  * Identify the appropriate session, inode, and call the right handler
- * based on the cap op.  Take read or write lock on snap_rwsem as
- * appropriate.
+ * based on the cap op.
  */
 void ceph_handle_caps(struct ceph_mds_client *mdsc,
                      struct ceph_msg *msg)
@@ -1611,8 +1623,6 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        /* find session */
        mutex_lock(&mdsc->mutex);
        session = __ceph_get_mds_session(mdsc, mds);
-       if (session)
-               down_write(&mdsc->snap_rwsem);
        mutex_unlock(&mdsc->mutex);
        if (!session) {
                dout(10, "WTF, got cap but no session for mds%d\n", mds);
@@ -1636,12 +1646,10 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        switch (op) {
        case CEPH_CAP_OP_FLUSHSNAP_ACK:
                handle_cap_flushsnap_ack(inode, h, session);
-               up_write(&mdsc->snap_rwsem);
                goto done;
 
        case CEPH_CAP_OP_EXPORT:
                handle_cap_export(inode, h, session);
-               up_write(&mdsc->snap_rwsem);
                if (list_empty(&session->s_caps))
                        ceph_mdsc_flushed_all_caps(mdsc, session);
                goto done;
@@ -1650,7 +1658,6 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
                handle_cap_import(mdsc, inode, h, session,
                                  msg->front.iov_base + sizeof(*h),
                                  le32_to_cpu(h->snap_trace_len));
-               up_write(&mdsc->snap_rwsem);
                check_caps = 1; /* we may have sent a RELEASE to the old auth */
                goto done;
        }
@@ -1672,7 +1679,6 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        /* note that each of these drops i_lock for us */
        switch (op) {
        case CEPH_CAP_OP_GRANT:
-               up_write(&mdsc->snap_rwsem);
                r = handle_cap_grant(inode, h, session, cap,&xattr_data);
                if (r == 1) {
                        dout(10, " sending reply back to mds%d\n", mds);
@@ -1686,19 +1692,16 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
 
        case CEPH_CAP_OP_FLUSH_ACK:
                handle_cap_flush_ack(inode, h, session, cap);
-               up_write(&mdsc->snap_rwsem);
                if (list_empty(&session->s_caps))
                        ceph_mdsc_flushed_all_caps(mdsc, session);
                break;
 
        case CEPH_CAP_OP_TRUNC:
-               up_write(&mdsc->snap_rwsem);
                handle_cap_trunc(inode, h, session);
                break;
 
        default:
                spin_unlock(&inode->i_lock);
-               up_write(&mdsc->snap_rwsem);
                derr(10, " unknown cap op %d %s\n", op, ceph_cap_op_name(op));
        }
 
@@ -1719,7 +1722,6 @@ bad:
        return;
 
 release:
-       up_write(&mdsc->snap_rwsem);
        send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE,
                     0, 0, 0,
                     seq, 0,
index da53fb2b078ffb7599f2b6168ae8eebfab9d2d8e..6eaea34cf91ea4b3dd8b70574e3291944968e776 100644 (file)
@@ -754,6 +754,8 @@ out:
  *
  * FIXME: we should check inode.version to avoid races between traces
  * from multiple MDSs after, say, a ancestor directory is renamed.
+ *
+ * Called with snap_rwsem (read).
  */
 int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                    struct ceph_mds_session *session)
index 24b65821b9a8e63caa8a71da326c27885cdfb753..60624985a8fc909154a89ff9f0ca519a8cf769b5 100644 (file)
@@ -1262,7 +1262,6 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        u64 tid;
        int err, result;
        int mds;
-       struct ceph_snap_realm *realm = NULL;
 
        if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
                return;
@@ -1332,8 +1331,6 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
        }
 
-       /* take the snap sem -- we may be are adding a cap here */
-       down_write(&mdsc->snap_rwsem);
        mutex_unlock(&mdsc->mutex);
 
        mutex_lock(&req->r_session->s_mutex);
@@ -1349,10 +1346,15 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        dout(10, "handle_reply tid %lld result %d\n", tid, result);
 
        /* snap trace */
-       if (rinfo->snapblob_len)
-               realm = ceph_update_snap_trace(mdsc, rinfo->snapblob,
+       if (rinfo->snapblob_len) {
+               down_write(&mdsc->snap_rwsem);
+               ceph_update_snap_trace(mdsc, rinfo->snapblob,
                               rinfo->snapblob + rinfo->snapblob_len,
                               le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
+               downgrade_write(&mdsc->snap_rwsem);
+       } else {
+               down_read(&mdsc->snap_rwsem);
+       }
 
        /* insert trace into our cache */
        err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
@@ -1366,9 +1368,7 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 
 
 done:
-       if (realm)
-               ceph_put_snap_realm(mdsc, realm);
-       up_write(&mdsc->snap_rwsem);
+       up_read(&mdsc->snap_rwsem);
 
        if (err) {
                req->r_err = err;
@@ -1508,7 +1508,6 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 
        /* find session */
        session = __ceph_get_mds_session(mdsc, mds);
-       down_read(&mdsc->snap_rwsem);
        mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
 
        if (session) {
@@ -1531,6 +1530,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                     mds);
        }
 
+       down_read(&mdsc->snap_rwsem);
+
 retry:
        /* build reply */
        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
@@ -1657,11 +1658,11 @@ send:
        }
 
 out:
+       up_read(&mdsc->snap_rwsem);
        if (session) {
                mutex_unlock(&session->s_mutex);
                ceph_put_mds_session(session);
        }
-       up_read(&mdsc->snap_rwsem);
        mutex_lock(&mdsc->mutex);
        return;
 
@@ -1996,6 +1997,8 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        mdsc->stopping = 0;
        init_rwsem(&mdsc->snap_rwsem);
        INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
+       INIT_LIST_HEAD(&mdsc->snap_empty);
+       spin_lock_init(&mdsc->snap_empty_lock); 
        mdsc->last_tid = 0;
        INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
@@ -2110,6 +2113,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 
        mutex_unlock(&mdsc->mutex);
 
+       ceph_cleanup_empty_realms(mdsc);
+
        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
 
        dout(10, "stopped\n");
index 02f0710e09fcbcdee1d5b2226d58bc76ba6ffb0d..7f93791fe7305d1bc61c4cd73e4a8619597c3464 100644 (file)
  *
  * session->s_mutex
  *         mdsc->mutex
- *                 mdsc->snap_rwsem
+ *
+ *         mdsc->snap_rwsem
  *
  *         inode->i_lock
  *                 mdsc->snap_flush_lock
  *                 mdsc->cap_delay_lock
  *
+ *
  */
 
 struct ceph_client;
@@ -194,11 +196,16 @@ struct ceph_mds_client {
        int                     stopping;      /* true if shutting down */
 
        /*
-        * snap_rwsem will cover cap linkage into snaprealms, and realm
-        * snap contexts.  (later, we can do per-realm snap contexts locks..)
+        * snap_rwsem will cover cap linkage into snaprealms, and
+        * realm snap contexts.  (later, we can do per-realm snap
+        * contexts locks..)  the empty list contains realms with no
+        * references (implying they contain no inodes with caps) that
+        * should be destroyed.
         */
        struct rw_semaphore     snap_rwsem;
        struct radix_tree_root  snap_realms;
+       struct list_head        snap_empty;
+       spinlock_t              snap_empty_lock;  /* protect snap_empty */
 
        u64                    last_tid;      /* most recent mds request */
        struct radix_tree_root request_tree;  /* pending mds requests */
index 13cf1b8b91ef0d9f7378f6f92c0ca514dc986453..12043e38203a90329d0cb371c3d7ca486440b43b 100644 (file)
@@ -49,6 +49,12 @@ int ceph_debug_snap = -1;
  * realm, which simply lists the resulting set of snaps for the realm.  This
  * is attached to any writes sent to OSDs.
  */
+/*
+ * Unfortunately error handling is a bit mixed here.  If we get a snap
+ * update, but don't have enough memory to update our realm hierarchy,
+ * it's not clear what we can do about it (besides complaining to the
+ * console).
+ */
 
 
 /*
@@ -56,17 +62,23 @@ int ceph_debug_snap = -1;
  *
  * caller must hold snap_rwsem for write.
  */
-static void get_realm(struct ceph_snap_realm *realm)
+static void get_realm(struct ceph_mds_client *mdsc,
+                     struct ceph_snap_realm *realm)
 {
-       realm->nref++;
-}
+       /*
+        * since we _only_ increment realm refs or empty the empty
+        * list with snap_rwsem held, adjusting the empty list here is
+        * safe.  we do need to protect against concurrent empty list
+        * additions, however.
+        */
+       if (atomic_read(&realm->nref) == 0) {
+               spin_lock(&mdsc->snap_empty_lock);
+               list_del_init(&realm->empty_item);
+               spin_unlock(&mdsc->snap_empty_lock);
+       }
 
-/*
- * Unfortunately error handling is a bit mixed here.  If we get a snap
- * update, but don't have enough memory to update our realm hierarchy,
- * it's not clear what we can do about it (besides complaining to the
- * console).
- */
+       atomic_inc(&realm->nref);
+}
 
 /*
  * create and get the realm rooted at @ino and bump its ref count.
@@ -74,18 +86,21 @@ static void get_realm(struct ceph_snap_realm *realm)
  * caller must hold snap_rwsem for write.
  */
 struct ceph_snap_realm *ceph_create_snap_realm(struct ceph_mds_client *mdsc,
-                                           u64 ino)
+                                              u64 ino)
 {
        struct ceph_snap_realm *realm;
 
        realm = kzalloc(sizeof(*realm), GFP_NOFS);
        if (!realm)
                return ERR_PTR(-ENOMEM);
+
        radix_tree_insert(&mdsc->snap_realms, ino, realm);
-       realm->nref = 0;    /* tree does not take a ref */
+
+       atomic_set(&realm->nref, 0);    /* tree does not take a ref */
        realm->ino = ino;
        INIT_LIST_HEAD(&realm->children);
        INIT_LIST_HEAD(&realm->child_item);
+       INIT_LIST_HEAD(&realm->empty_item);
        INIT_LIST_HEAD(&realm->inodes_with_caps);
        dout(20, "create_snap_realm %llx %p\n", realm->ino, realm);
        return realm;
@@ -104,32 +119,97 @@ struct ceph_snap_realm *ceph_get_snap_realm(struct ceph_mds_client *mdsc,
        realm = radix_tree_lookup(&mdsc->snap_realms, ino);
        if (realm) {
                dout(20, "get_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
-                    realm->nref, realm->nref+1);
-               get_realm(realm);
+                    atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
+               get_realm(mdsc, realm);
        }
        return realm;
 }
 
+static void __put_snap_realm(struct ceph_mds_client *mdsc,
+                            struct ceph_snap_realm *realm);
+
+/*
+ * called with snap_rwsem (write)
+ */
+static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
+                                struct ceph_snap_realm *realm)
+{
+       dout(10, "__destroy_snap_realm %p %llx\n", realm, realm->ino);
+
+       radix_tree_delete(&mdsc->snap_realms, realm->ino);
+       
+       if (realm->parent) {
+               list_del_init(&realm->child_item);
+               __put_snap_realm(mdsc, realm->parent);
+       }
+       
+       kfree(realm->prior_parent_snaps);
+       kfree(realm->snaps);
+       ceph_put_snap_context(realm->cached_context);
+       kfree(realm);
+}
+
+/*
+ * caller holds snap_rwsem (write)
+ */
+static void __put_snap_realm(struct ceph_mds_client *mdsc,
+                            struct ceph_snap_realm *realm)
+{
+       dout(20, "__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
+            atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
+       if (atomic_dec_and_test(&realm->nref))
+               __destroy_snap_realm(mdsc, realm);
+}
+
 /*
- * caller must hold snap_rwsem for write
+ * caller needn't hold any locks
  */
 void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
                         struct ceph_snap_realm *realm)
 {
        dout(20, "put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
-            realm->nref, realm->nref-1);
-       realm->nref--;
-       if (realm->nref == 0) {
-               if (realm->parent) {
-                       list_del_init(&realm->child_item);
-                       ceph_put_snap_realm(mdsc, realm->parent);
-               }
-               radix_tree_delete(&mdsc->snap_realms, realm->ino);
-               kfree(realm->prior_parent_snaps);
-               kfree(realm->snaps);
-               ceph_put_snap_context(realm->cached_context);
-               kfree(realm);
+            atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
+       if (!atomic_dec_and_test(&realm->nref))
+               return;
+
+       if (down_write_trylock(&mdsc->snap_rwsem)) {
+               __destroy_snap_realm(mdsc, realm);
+               up_write(&mdsc->snap_rwsem);
+       } else {
+               spin_lock(&mdsc->snap_empty_lock);
+               list_add(&mdsc->snap_empty, &realm->empty_item);
+               spin_unlock(&mdsc->snap_empty_lock);
+       }
+}
+
+/*
+ * Clean up any realms whose ref counts have dropped to zero.  Note
+ * that this does not include realms who were created but not yet
+ * used.
+ *
+ * Called under snap_rwsem (write)
+ */
+static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
+{
+       struct ceph_snap_realm *realm;
+
+       spin_lock(&mdsc->snap_empty_lock);
+       while (!list_empty(&mdsc->snap_empty)) {
+               realm = list_entry(&mdsc->snap_empty, struct ceph_snap_realm,
+                                  empty_item);
+               list_del(&realm->empty_item);
+               spin_unlock(&mdsc->snap_empty_lock);
+               __destroy_snap_realm(mdsc, realm);
+               spin_lock(&mdsc->snap_empty_lock);
        }
+       spin_unlock(&mdsc->snap_empty_lock);
+}
+
+void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc)
+{
+       down_write(&mdsc->snap_rwsem);
+       __cleanup_empty_realms(mdsc);
+       up_write(&mdsc->snap_rwsem);
 }
 
 /*
@@ -166,7 +246,7 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
        }
        realm->parent_ino = parentino;
        realm->parent = parent;
-       get_realm(parent);
+       get_realm(mdsc, parent);
        list_add(&realm->child_item, &parent->children);
        return 1;
 }
@@ -438,13 +518,13 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
  *
  * Caller must hold snap_rwsem for write.
  */
-struct ceph_snap_realm *ceph_update_snap_trace(struct ceph_mds_client *mdsc,
-                                              void *p, void *e, bool deletion)
+int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
+                          void *p, void *e, bool deletion)
 {
        struct ceph_mds_snap_realm *ri;    /* encoded */
        __le64 *snaps;                     /* encoded */
        __le64 *prior_parent_snaps;        /* encoded */
-       struct ceph_snap_realm *realm, *first = NULL;
+       struct ceph_snap_realm *realm;
        int invalidate = 0;
        int err = -ENOMEM;
 
@@ -472,13 +552,6 @@ more:
                        goto fail;
                }
        }
-       if (!first) {
-               /* take note if this is the first realm in the trace
-                * (the most deeply nested)... we will return if (with
-                * nref bumped) to the caller. */
-               first = realm;
-               get_realm(realm);
-       }
 
        if (le64_to_cpu(ri->seq) > realm->seq) {
                dout(10, "update_snap_trace updating %llx %p %lld -> %lld\n",
@@ -547,13 +620,15 @@ more:
        if (p < e)
                goto more;
 
-       return first;
+       __cleanup_empty_realms(mdsc);
+
+       return 0;
 
 bad:
        err = -EINVAL;
 fail:
        derr(10, "update_snap_trace error %d\n", err);
-       return ERR_PTR(err);
+       return err;
 }
 
 
@@ -644,19 +719,19 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
        /* find session */
        mutex_lock(&mdsc->mutex);
        session = __ceph_get_mds_session(mdsc, mds);
-       if (session)
-               down_write(&mdsc->snap_rwsem);
        mutex_unlock(&mdsc->mutex);
        if (!session) {
                dout(10, "WTF, got snap but no session for mds%d\n", mds);
                return;
        }
-       locked_rwsem = 1;
 
        mutex_lock(&session->s_mutex);
        session->s_seq++;
        mutex_unlock(&session->s_mutex);
 
+       down_write(&mdsc->snap_rwsem);
+       locked_rwsem = 1;
+
        if (op == CEPH_SNAP_OP_SPLIT) {
                struct ceph_mds_snap_realm *ri;
 
@@ -683,7 +758,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        realm = ceph_create_snap_realm(mdsc, split);
                        if (IS_ERR(realm))
                                goto out;
-                       get_realm(realm);
+                       get_realm(mdsc, realm);
                }
 
                dout(10, "splitting snap_realm %llx %p\n", realm->ino, realm);
@@ -757,10 +832,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
         * update using the provided snap trace. if we are deleting a
         * snap, we can avoid queueing cap_snaps.
         */
-       realm = ceph_update_snap_trace(mdsc, p, e,
-                                      op == CEPH_SNAP_OP_DESTROY);
-       if (IS_ERR(realm))
-               goto bad;
+       ceph_update_snap_trace(mdsc, p, e,
+                              op == CEPH_SNAP_OP_DESTROY);
 
        if (op == CEPH_SNAP_OP_SPLIT) {
                /*
@@ -784,7 +857,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        list_add(&ci->i_snap_realm_item,
                                 &realm->inodes_with_caps);
                        ci->i_snap_realm = realm;
-                       get_realm(realm);
+                       get_realm(mdsc, realm);
                split_skip_inode:
                        spin_unlock(&inode->i_lock);
                        iput(inode);
@@ -794,7 +867,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                ceph_put_snap_realm(mdsc, realm);
        }
 
-       ceph_put_snap_realm(mdsc, realm);
+       __cleanup_empty_realms(mdsc);
+
        up_write(&mdsc->snap_rwsem);
 
        flush_snaps(mdsc);
index fe7f39f90d29cfc303ceac5ef62ddcef255d634f..d4ca6e7c1de0ded72d56d5b1e4793de6da9023ab 100644 (file)
@@ -572,7 +572,7 @@ static inline void ceph_put_snap_context(struct ceph_snap_context *sc)
  */
 struct ceph_snap_realm {
        u64 ino;
-       int nref;
+       atomic_t nref;
        u64 created, seq;
        u64 parent_ino;
        u64 parent_since;   /* snapid when our current parent became so */
@@ -586,6 +586,8 @@ struct ceph_snap_realm {
        struct list_head children;       /* list of child realms */
        struct list_head child_item;
 
+       struct list_head empty_item;     /* if i have ref==0 */
+
        /* the current set of snaps for this realm */
        struct ceph_snap_context *cached_context;
 
@@ -611,15 +613,15 @@ struct ceph_snap_realm *ceph_get_snap_realm(struct ceph_mds_client *mdsc,
                                            u64 ino);
 extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
                                struct ceph_snap_realm *realm);
-extern struct ceph_snap_realm *ceph_update_snap_trace(struct ceph_mds_client *m,
-                                                     void *p, void *e,
-                                                     bool deletion);
+extern int ceph_update_snap_trace(struct ceph_mds_client *m,
+                                 void *p, void *e, bool deletion);
 extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
                             struct ceph_msg *msg);
 extern void ceph_queue_cap_snap(struct ceph_inode_info *ci,
                                struct ceph_snap_context *snapc);
 extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
                                  struct ceph_cap_snap *capsnap);
+extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
 
 /*
  * a cap_snap is "pending" if it is still awaiting an in-progress