]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-client.git/commitdiff
ceph: caps reservation is being done per mdsc
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 18 Jun 2010 23:08:04 +0000 (16:08 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 21 Jun 2010 21:30:53 +0000 (14:30 -0700)
fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h

index 041b608ab26c8f59825ffd9419c99cdc307e0c55..df97c3e5e41716b3dd1ecfd54697c21554697ab0 100644 (file)
@@ -117,6 +117,9 @@ void ceph_caps_init(struct ceph_mds_client *mdsc)
 {
        INIT_LIST_HEAD(&mdsc->caps_list);
        spin_lock_init(&mdsc->caps_list_lock);
+       INIT_LIST_HEAD(&mdsc->caps_releases);
+       spin_lock_init(&mdsc->caps_release_lock);
+       mdsc->caps_num_releases = 0;
 }
 
 void ceph_caps_finalize(struct ceph_mds_client *mdsc)
@@ -976,14 +979,22 @@ static void __queue_cap_release(struct ceph_mds_session *session,
        struct ceph_msg *msg;
        struct ceph_mds_cap_release *head;
        struct ceph_mds_cap_item *item;
+       struct ceph_mds_client *mdsc = session->s_mdsc;
 
        spin_lock(&session->s_cap_lock);
-       BUG_ON(!session->s_num_cap_releases);
-       msg = list_first_entry(&session->s_cap_releases,
-                              struct ceph_msg, list_head);
+       msg = session->s_cap_release_msg;
+       if (!msg) {
+               spin_lock(&mdsc->caps_release_lock);
+               BUG_ON(!mdsc->caps_num_releases);
+               msg = list_first_entry(&mdsc->caps_releases,
+                                      struct ceph_msg, list_head);
+               spin_unlock(&mdsc->caps_release_lock);
+               session->s_cap_release_msg = msg;
+               list_del_init(&msg->list_head);
+       }
 
        dout(" adding %llx release to mds%d msg %p (%d left)\n",
-            ino, session->s_mds, msg, session->s_num_cap_releases);
+            ino, session->s_mds, msg, mdsc->caps_num_releases);
 
        BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
        head = msg->front.iov_base;
@@ -994,12 +1005,13 @@ static void __queue_cap_release(struct ceph_mds_session *session,
        item->migrate_seq = cpu_to_le32(migrate_seq);
        item->seq = cpu_to_le32(issue_seq);
 
-       session->s_num_cap_releases--;
+       mdsc->caps_num_releases--;
 
        msg->front.iov_len += sizeof(*item);
        if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+               session->s_cap_release_msg = NULL;
                dout(" release msg %p full\n", msg);
-               list_move_tail(&msg->list_head, &session->s_cap_releases_done);
+               list_add_tail(&msg->list_head, &session->s_cap_releases_done);
        } else {
                dout(" release msg %p at %d/%d (%d)\n", msg,
                     (int)le32_to_cpu(head->num),
index 6ca8e2904158f633d0e40bd68a230fe5222c1417..55cd87a35176e05fe707fc7de79b68fdcdddac87 100644 (file)
@@ -355,12 +355,11 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        atomic_set(&s->s_ref, 1);
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
-       s->s_num_cap_releases = 0;
        s->s_cap_iterator = NULL;
-       INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_releases_done);
        INIT_LIST_HEAD(&s->s_cap_flushing);
        INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
+       s->s_cap_release_msg = NULL;
 
        dout("register_session mds%d\n", mds);
        if (mds >= mdsc->max_sessions) {
@@ -761,12 +760,8 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
        struct ceph_msg *msg;
 
        spin_lock(&session->s_cap_lock);
-       while (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
-               ceph_msg_put(msg);
-       }
+       if (session->s_cap_release_msg)
+               ceph_msg_put(session->s_cap_release_msg);
        while (!list_empty(&session->s_cap_releases_done)) {
                msg = list_first_entry(&session->s_cap_releases_done,
                                       struct ceph_msg, list_head);
@@ -1105,6 +1100,13 @@ static int trim_caps(struct ceph_mds_client *mdsc,
        return 0;
 }
 
+static int __need_alloc_caps_reserve(struct ceph_mds_client *mdsc)
+{
+       int need = mdsc->caps_total_count +
+                  mdsc->max_sessions * CEPH_CAPS_PER_RELEASE;
+       return (mdsc->caps_num_releases < need);
+}
+
 /*
  * Allocate cap_release messages.  If there is a partially full message
  * in the queue, try to allocate enough to cover it's remainder, so that
@@ -1126,21 +1128,23 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
 
        spin_lock(&session->s_cap_lock);
 
-       if (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg,
-                                list_head);
+       msg = session->s_cap_release_msg;
+       if (msg) {
                head = msg->front.iov_base;
                num = le32_to_cpu(head->num);
                if (num) {
                        dout(" partial %p with (%d/%d)\n", msg, num,
                             (int)CEPH_CAPS_PER_RELEASE);
                        extra += CEPH_CAPS_PER_RELEASE - num;
+                       session->s_cap_release_msg = NULL;
                        partial = msg;
                }
        }
-       while (session->s_num_cap_releases < session->s_nr_caps + extra) {
-               spin_unlock(&session->s_cap_lock);
+       spin_unlock(&session->s_cap_lock);
+
+       spin_lock(&mdsc->caps_release_lock);
+       while (__need_alloc_caps_reserve(mdsc)) {
+               spin_unlock(&mdsc->caps_release_lock);
                msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
                                   GFP_NOFS);
                if (!msg)
@@ -1150,19 +1154,21 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
                head = msg->front.iov_base;
                head->num = cpu_to_le32(0);
                msg->front.iov_len = sizeof(*head);
-               spin_lock(&session->s_cap_lock);
-               list_add(&msg->list_head, &session->s_cap_releases);
-               session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
+               spin_lock(&mdsc->caps_release_lock);
+               list_add(&msg->list_head, &mdsc->caps_releases);
+               mdsc->caps_num_releases += CEPH_CAPS_PER_RELEASE;
        }
+       spin_unlock(&mdsc->caps_release_lock);
 
+       spin_lock(&session->s_cap_lock);
        if (partial) {
                head = partial->front.iov_base;
                num = le32_to_cpu(head->num);
                dout(" queueing partial %p with %d/%d\n", partial, num,
                     (int)CEPH_CAPS_PER_RELEASE);
-               list_move_tail(&partial->list_head,
+               list_add_tail(&partial->list_head,
                               &session->s_cap_releases_done);
-               session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
+               mdsc->caps_num_releases -= CEPH_CAPS_PER_RELEASE - num;
        }
        err = 0;
        spin_unlock(&session->s_cap_lock);
@@ -1251,16 +1257,23 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
        unsigned num;
 
        dout("discard_cap_releases mds%d\n", session->s_mds);
-       spin_lock(&session->s_cap_lock);
 
        /* zero out the in-progress message */
-       msg = list_first_entry(&session->s_cap_releases,
-                              struct ceph_msg, list_head);
-       head = msg->front.iov_base;
-       num = le32_to_cpu(head->num);
-       dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
-       head->num = cpu_to_le32(0);
-       session->s_num_cap_releases += num;
+       spin_lock(&session->s_cap_lock);
+       msg = session->s_cap_release_msg;
+       if (msg)
+               session->s_cap_release_msg = NULL;
+       spin_unlock(&session->s_cap_lock);
+
+       spin_lock(&mdsc->caps_release_lock);
+       if (msg) {
+               head = msg->front.iov_base;
+               num = le32_to_cpu(head->num);
+               dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
+               head->num = cpu_to_le32(0);
+               mdsc->caps_num_releases += num;
+               list_add(&msg->list_head, &mdsc->caps_releases);
+       }
 
        /* requeue completed messages */
        while (!list_empty(&session->s_cap_releases_done)) {
@@ -1272,13 +1285,12 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
                num = le32_to_cpu(head->num);
                dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
                     num);
-               session->s_num_cap_releases += num;
+               mdsc->caps_num_releases += num;
                head->num = cpu_to_le32(0);
                msg->front.iov_len = sizeof(*head);
-               list_add(&msg->list_head, &session->s_cap_releases);
+               list_add(&msg->list_head, &mdsc->caps_releases);
        }
-
-       spin_unlock(&session->s_cap_lock);
+       spin_unlock(&mdsc->caps_release_lock);
 }
 
 /*
index 1cc0eed87c8170a2a00f89ad61513f8679ff25a8..9f27905ad50aa50193360171e7fa147b9d433bcf 100644 (file)
@@ -111,10 +111,9 @@ struct ceph_mds_session {
        unsigned long     s_cap_ttl;  /* when session caps expire */
        struct list_head  s_caps;     /* all caps issued by this session */
        int               s_nr_caps, s_trim_caps;
-       int               s_num_cap_releases;
-       struct list_head  s_cap_releases; /* waiting cap_release messages */
        struct list_head  s_cap_releases_done; /* ready to send */
        struct ceph_cap  *s_cap_iterator;
+       struct ceph_msg  *s_cap_release_msg;
 
        /* protected by mutex */
        struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
@@ -278,15 +277,18 @@ struct ceph_mds_client {
         *
         * Reservations are 'owned' by a ceph_cap_reservation context.
         */
-       spinlock_t      caps_list_lock;
-       struct          list_head caps_list; /* unused (reserved or
+       spinlock_t       caps_list_lock;
+       struct           list_head caps_list; /* unused (reserved or
                                                unreserved) */
-       int             caps_total_count;    /* total caps allocated */
-       int             caps_use_count;      /* in use */
-       int             caps_reserve_count;  /* unused, reserved */
-       int             caps_avail_count;    /* unused, unreserved */
-       int             caps_min_count;      /* keep at least this many
+       int              caps_total_count;    /* total caps allocated */
+       int              caps_use_count;      /* in use */
+       int              caps_reserve_count;  /* unused, reserved */
+       int              caps_avail_count;    /* unused, unreserved */
+       int              caps_min_count;      /* keep at least this many
                                                (unreserved) */
+       int              caps_num_releases;
+       spinlock_t       caps_release_lock;
+       struct list_head caps_releases;
 
 #ifdef CONFIG_DEBUG_FS
        struct dentry     *debugfs_file;