From 97335e09576abad9eb83c7e22cc1350c64803eb7 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 18 Jun 2010 16:08:04 -0700 Subject: [PATCH] ceph: caps reservation is being done per mdsc --- fs/ceph/caps.c | 24 ++++++++++---- fs/ceph/mds_client.c | 74 +++++++++++++++++++++++++------------------- fs/ceph/mds_client.h | 20 ++++++------ 3 files changed, 72 insertions(+), 46 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 041b608ab26c..df97c3e5e417 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -117,6 +117,9 @@ void ceph_caps_init(struct ceph_mds_client *mdsc) { INIT_LIST_HEAD(&mdsc->caps_list); spin_lock_init(&mdsc->caps_list_lock); + INIT_LIST_HEAD(&mdsc->caps_releases); + spin_lock_init(&mdsc->caps_release_lock); + mdsc->caps_num_releases = 0; } void ceph_caps_finalize(struct ceph_mds_client *mdsc) @@ -976,14 +979,22 @@ static void __queue_cap_release(struct ceph_mds_session *session, struct ceph_msg *msg; struct ceph_mds_cap_release *head; struct ceph_mds_cap_item *item; + struct ceph_mds_client *mdsc = session->s_mdsc; spin_lock(&session->s_cap_lock); - BUG_ON(!session->s_num_cap_releases); - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); + msg = session->s_cap_release_msg; + if (!msg) { + spin_lock(&mdsc->caps_release_lock); + BUG_ON(!mdsc->caps_num_releases); + msg = list_first_entry(&mdsc->caps_releases, + struct ceph_msg, list_head); + spin_unlock(&mdsc->caps_release_lock); + session->s_cap_release_msg = msg; + list_del_init(&msg->list_head); + } dout(" adding %llx release to mds%d msg %p (%d left)\n", - ino, session->s_mds, msg, session->s_num_cap_releases); + ino, session->s_mds, msg, mdsc->caps_num_releases); BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); head = msg->front.iov_base; @@ -994,12 +1005,13 @@ static void __queue_cap_release(struct ceph_mds_session *session, item->migrate_seq = cpu_to_le32(migrate_seq); item->seq = cpu_to_le32(issue_seq); - session->s_num_cap_releases--; + mdsc->caps_num_releases--; msg->front.iov_len += sizeof(*item); if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + session->s_cap_release_msg = NULL; dout(" release msg %p full\n", msg); - list_move_tail(&msg->list_head, &session->s_cap_releases_done); + list_add_tail(&msg->list_head, &session->s_cap_releases_done); } else { dout(" release msg %p at %d/%d (%d)\n", msg, (int)le32_to_cpu(head->num), diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 6ca8e2904158..55cd87a35176 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -355,12 +355,11 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, atomic_set(&s->s_ref, 1); INIT_LIST_HEAD(&s->s_waiting); INIT_LIST_HEAD(&s->s_unsafe); - s->s_num_cap_releases = 0; s->s_cap_iterator = NULL; - INIT_LIST_HEAD(&s->s_cap_releases); INIT_LIST_HEAD(&s->s_cap_releases_done); INIT_LIST_HEAD(&s->s_cap_flushing); INIT_LIST_HEAD(&s->s_cap_snaps_flushing); + s->s_cap_release_msg = NULL; dout("register_session mds%d\n", mds); if (mds >= mdsc->max_sessions) { @@ -761,12 +760,8 @@ static void cleanup_cap_releases(struct ceph_mds_session *session) struct ceph_msg *msg; spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); - } + if (session->s_cap_release_msg) + ceph_msg_put(session->s_cap_release_msg); while (!list_empty(&session->s_cap_releases_done)) { msg = list_first_entry(&session->s_cap_releases_done, struct ceph_msg, list_head); @@ -1105,6 +1100,13 @@ static int trim_caps(struct ceph_mds_client *mdsc, return 0; } +static int __need_alloc_caps_reserve(struct ceph_mds_client *mdsc) +{ + int need = mdsc->caps_total_count + + mdsc->max_sessions * CEPH_CAPS_PER_RELEASE; + return (mdsc->caps_num_releases < need); +} + /* * Allocate cap_release messages. If there is a partially full message * in the queue, try to allocate enough to cover it's remainder, so that @@ -1126,21 +1128,23 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, spin_lock(&session->s_cap_lock); - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); + msg = session->s_cap_release_msg; + if (msg) { head = msg->front.iov_base; num = le32_to_cpu(head->num); if (num) { dout(" partial %p with (%d/%d)\n", msg, num, (int)CEPH_CAPS_PER_RELEASE); extra += CEPH_CAPS_PER_RELEASE - num; + session->s_cap_release_msg = NULL; partial = msg; } } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { - spin_unlock(&session->s_cap_lock); + spin_unlock(&session->s_cap_lock); + + spin_lock(&mdsc->caps_release_lock); + while (__need_alloc_caps_reserve(mdsc)) { + spin_unlock(&mdsc->caps_release_lock); msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, GFP_NOFS); if (!msg) @@ -1150,19 +1154,21 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, head = msg->front.iov_base; head->num = cpu_to_le32(0); msg->front.iov_len = sizeof(*head); - spin_lock(&session->s_cap_lock); - list_add(&msg->list_head, &session->s_cap_releases); - session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; + spin_lock(&mdsc->caps_release_lock); + list_add(&msg->list_head, &mdsc->caps_releases); + mdsc->caps_num_releases += CEPH_CAPS_PER_RELEASE; } + spin_unlock(&mdsc->caps_release_lock); + spin_lock(&session->s_cap_lock); if (partial) { head = partial->front.iov_base; num = le32_to_cpu(head->num); dout(" queueing partial %p with %d/%d\n", partial, num, (int)CEPH_CAPS_PER_RELEASE); - list_move_tail(&partial->list_head, + list_add_tail(&partial->list_head, &session->s_cap_releases_done); - session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; + mdsc->caps_num_releases -= CEPH_CAPS_PER_RELEASE - num; } err = 0; spin_unlock(&session->s_cap_lock); @@ -1251,16 +1257,23 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc, unsigned num; dout("discard_cap_releases mds%d\n", session->s_mds); - spin_lock(&session->s_cap_lock); /* zero out the in-progress message */ - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); - head->num = cpu_to_le32(0); - session->s_num_cap_releases += num; + spin_lock(&session->s_cap_lock); + msg = session->s_cap_release_msg; + if (msg) + session->s_cap_release_msg = NULL; + spin_unlock(&session->s_cap_lock); + + spin_lock(&mdsc->caps_release_lock); + if (msg) { + head = msg->front.iov_base; + num = le32_to_cpu(head->num); + dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); + head->num = cpu_to_le32(0); + mdsc->caps_num_releases += num; + list_add(&msg->list_head, &mdsc->caps_releases); + } /* requeue completed messages */ while (!list_empty(&session->s_cap_releases_done)) { @@ -1272,13 +1285,12 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc, num = le32_to_cpu(head->num); dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); - session->s_num_cap_releases += num; + mdsc->caps_num_releases += num; head->num = cpu_to_le32(0); msg->front.iov_len = sizeof(*head); - list_add(&msg->list_head, &session->s_cap_releases); + list_add(&msg->list_head, &mdsc->caps_releases); } - - spin_unlock(&session->s_cap_lock); + spin_unlock(&mdsc->caps_release_lock); } /* diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 1cc0eed87c81..9f27905ad50a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -111,10 +111,9 @@ struct ceph_mds_session { unsigned long s_cap_ttl; /* when session caps expire */ struct list_head s_caps; /* all caps issued by this session */ int s_nr_caps, s_trim_caps; - int s_num_cap_releases; - struct list_head s_cap_releases; /* waiting cap_release messages */ struct list_head s_cap_releases_done; /* ready to send */ struct ceph_cap *s_cap_iterator; + struct ceph_msg *s_cap_release_msg; /* protected by mutex */ struct list_head s_cap_flushing; /* inodes w/ flushing caps */ @@ -278,15 +277,18 @@ struct ceph_mds_client { * * Reservations are 'owned' by a ceph_cap_reservation context. */ - spinlock_t caps_list_lock; - struct list_head caps_list; /* unused (reserved or + spinlock_t caps_list_lock; + struct list_head caps_list; /* unused (reserved or unreserved) */ - int caps_total_count; /* total caps allocated */ - int caps_use_count; /* in use */ - int caps_reserve_count; /* unused, reserved */ - int caps_avail_count; /* unused, unreserved */ - int caps_min_count; /* keep at least this many + int caps_total_count; /* total caps allocated */ + int caps_use_count; /* in use */ + int caps_reserve_count; /* unused, reserved */ + int caps_avail_count; /* unused, unreserved */ + int caps_min_count; /* keep at least this many (unreserved) */ + int caps_num_releases; + spinlock_t caps_release_lock; + struct list_head caps_releases; #ifdef CONFIG_DEBUG_FS struct dentry *debugfs_file; -- 2.47.3