{
INIT_LIST_HEAD(&mdsc->caps_list);
spin_lock_init(&mdsc->caps_list_lock);
+ INIT_LIST_HEAD(&mdsc->caps_releases);
+ spin_lock_init(&mdsc->caps_release_lock);
+ mdsc->caps_num_releases = 0;
}
void ceph_caps_finalize(struct ceph_mds_client *mdsc)
struct ceph_msg *msg;
struct ceph_mds_cap_release *head;
struct ceph_mds_cap_item *item;
+ struct ceph_mds_client *mdsc = session->s_mdsc;
spin_lock(&session->s_cap_lock);
- BUG_ON(!session->s_num_cap_releases);
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
+ msg = session->s_cap_release_msg;
+ if (!msg) {
+ spin_lock(&mdsc->caps_release_lock);
+ BUG_ON(!mdsc->caps_num_releases);
+ msg = list_first_entry(&mdsc->caps_releases,
+ struct ceph_msg, list_head);
+ spin_unlock(&mdsc->caps_release_lock);
+ session->s_cap_release_msg = msg;
+ list_del_init(&msg->list_head);
+ }
dout(" adding %llx release to mds%d msg %p (%d left)\n",
- ino, session->s_mds, msg, session->s_num_cap_releases);
+ ino, session->s_mds, msg, mdsc->caps_num_releases);
BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
head = msg->front.iov_base;
item->migrate_seq = cpu_to_le32(migrate_seq);
item->seq = cpu_to_le32(issue_seq);
- session->s_num_cap_releases--;
+ mdsc->caps_num_releases--;
msg->front.iov_len += sizeof(*item);
if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+ session->s_cap_release_msg = NULL;
dout(" release msg %p full\n", msg);
- list_move_tail(&msg->list_head, &session->s_cap_releases_done);
+ list_add_tail(&msg->list_head, &session->s_cap_releases_done);
} else {
dout(" release msg %p at %d/%d (%d)\n", msg,
(int)le32_to_cpu(head->num),
atomic_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
- s->s_num_cap_releases = 0;
s->s_cap_iterator = NULL;
- INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_releases_done);
INIT_LIST_HEAD(&s->s_cap_flushing);
INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
+ s->s_cap_release_msg = NULL;
dout("register_session mds%d\n", mds);
if (mds >= mdsc->max_sessions) {
struct ceph_msg *msg;
spin_lock(&session->s_cap_lock);
- while (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
- ceph_msg_put(msg);
- }
+ if (session->s_cap_release_msg)
+ ceph_msg_put(session->s_cap_release_msg);
while (!list_empty(&session->s_cap_releases_done)) {
msg = list_first_entry(&session->s_cap_releases_done,
struct ceph_msg, list_head);
return 0;
}
+static int __need_alloc_caps_reserve(struct ceph_mds_client *mdsc)
+{
+ int need = mdsc->caps_total_count +
+ mdsc->max_sessions * CEPH_CAPS_PER_RELEASE;
+ return (mdsc->caps_num_releases < need);
+}
+
/*
* Allocate cap_release messages. If there is a partially full message
* in the queue, try to allocate enough to cover it's remainder, so that
spin_lock(&session->s_cap_lock);
- if (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg,
- list_head);
+ msg = session->s_cap_release_msg;
+ if (msg) {
head = msg->front.iov_base;
num = le32_to_cpu(head->num);
if (num) {
dout(" partial %p with (%d/%d)\n", msg, num,
(int)CEPH_CAPS_PER_RELEASE);
extra += CEPH_CAPS_PER_RELEASE - num;
+ session->s_cap_release_msg = NULL;
partial = msg;
}
}
- while (session->s_num_cap_releases < session->s_nr_caps + extra) {
- spin_unlock(&session->s_cap_lock);
+ spin_unlock(&session->s_cap_lock);
+
+ spin_lock(&mdsc->caps_release_lock);
+ while (__need_alloc_caps_reserve(mdsc)) {
+ spin_unlock(&mdsc->caps_release_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
GFP_NOFS);
if (!msg)
head = msg->front.iov_base;
head->num = cpu_to_le32(0);
msg->front.iov_len = sizeof(*head);
- spin_lock(&session->s_cap_lock);
- list_add(&msg->list_head, &session->s_cap_releases);
- session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
+ spin_lock(&mdsc->caps_release_lock);
+ list_add(&msg->list_head, &mdsc->caps_releases);
+ mdsc->caps_num_releases += CEPH_CAPS_PER_RELEASE;
}
+ spin_unlock(&mdsc->caps_release_lock);
+ spin_lock(&session->s_cap_lock);
if (partial) {
head = partial->front.iov_base;
num = le32_to_cpu(head->num);
dout(" queueing partial %p with %d/%d\n", partial, num,
(int)CEPH_CAPS_PER_RELEASE);
- list_move_tail(&partial->list_head,
+ list_add_tail(&partial->list_head,
&session->s_cap_releases_done);
- session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
+ mdsc->caps_num_releases -= CEPH_CAPS_PER_RELEASE - num;
}
err = 0;
spin_unlock(&session->s_cap_lock);
unsigned num;
dout("discard_cap_releases mds%d\n", session->s_mds);
- spin_lock(&session->s_cap_lock);
/* zero out the in-progress message */
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
- head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
- head->num = cpu_to_le32(0);
- session->s_num_cap_releases += num;
+ spin_lock(&session->s_cap_lock);
+ msg = session->s_cap_release_msg;
+ if (msg)
+ session->s_cap_release_msg = NULL;
+ spin_unlock(&session->s_cap_lock);
+
+ spin_lock(&mdsc->caps_release_lock);
+ if (msg) {
+ head = msg->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
+ head->num = cpu_to_le32(0);
+ mdsc->caps_num_releases += num;
+ list_add(&msg->list_head, &mdsc->caps_releases);
+ }
/* requeue completed messages */
while (!list_empty(&session->s_cap_releases_done)) {
num = le32_to_cpu(head->num);
dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
num);
- session->s_num_cap_releases += num;
+ mdsc->caps_num_releases += num;
head->num = cpu_to_le32(0);
msg->front.iov_len = sizeof(*head);
- list_add(&msg->list_head, &session->s_cap_releases);
+ list_add(&msg->list_head, &mdsc->caps_releases);
}
-
- spin_unlock(&session->s_cap_lock);
+ spin_unlock(&mdsc->caps_release_lock);
}
/*
unsigned long s_cap_ttl; /* when session caps expire */
struct list_head s_caps; /* all caps issued by this session */
int s_nr_caps, s_trim_caps;
- int s_num_cap_releases;
- struct list_head s_cap_releases; /* waiting cap_release messages */
struct list_head s_cap_releases_done; /* ready to send */
struct ceph_cap *s_cap_iterator;
+ struct ceph_msg *s_cap_release_msg;
/* protected by mutex */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */
*
* Reservations are 'owned' by a ceph_cap_reservation context.
*/
- spinlock_t caps_list_lock;
- struct list_head caps_list; /* unused (reserved or
+ spinlock_t caps_list_lock;
+ struct list_head caps_list; /* unused (reserved or
unreserved) */
- int caps_total_count; /* total caps allocated */
- int caps_use_count; /* in use */
- int caps_reserve_count; /* unused, reserved */
- int caps_avail_count; /* unused, unreserved */
- int caps_min_count; /* keep at least this many
+ int caps_total_count; /* total caps allocated */
+ int caps_use_count; /* in use */
+ int caps_reserve_count; /* unused, reserved */
+ int caps_avail_count; /* unused, unreserved */
+ int caps_min_count; /* keep at least this many
(unreserved) */
+ int caps_num_releases;
+ spinlock_t caps_release_lock;
+ struct list_head caps_releases;
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_file;