From 629f053aab22aadd13725a1092fa47c62c73cf9a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 30 Apr 2008 13:19:35 -0700 Subject: [PATCH] kclient: take spinlock while processing acks in msgr --- src/kernel/messenger.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 55ce29eb4e27d..1ce8b8a8b4a3f 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -284,9 +284,11 @@ static void ceph_fault(struct ceph_connection *con) * connection in a STANDBY state otherwise retry with * delay */ + spin_lock(&con->out_queue_lock); if (list_empty(&con->out_queue)) { dout(10, "setting STANDBY bit\n"); set_bit(STANDBY, &con->state); + spin_unlock(&con->out_queue_lock); return; } @@ -294,11 +296,10 @@ static void ceph_fault(struct ceph_connection *con) derr(1, "CONNECTING bit not set\n"); /* TBD: reset buffer correctly */ /* reset buffer */ - spin_lock(&con->out_queue_lock); list_splice_init(&con->out_sent, &con->out_queue); - spin_unlock(&con->out_queue_lock); clear_bit(OPEN, &con->state); } + spin_unlock(&con->out_queue_lock); /* retry with delay */ ceph_queue_delayed_write(con); @@ -425,8 +426,7 @@ out: */ static void prepare_write_message(struct ceph_connection *con) { - struct ceph_msg *m = list_entry(con->out_queue.next, - struct ceph_msg, list_head); + struct ceph_msg *m; int v = 0; con->out_kvec_bytes = 0; @@ -442,6 +442,8 @@ static void prepare_write_message(struct ceph_connection *con) } /* move to sending/sent list */ + m = list_entry(con->out_queue.next, + struct ceph_msg, list_head); list_del_init(&m->list_head); list_add_tail(&m->list_head, &con->out_sent); con->out_msg = m; /* we dont bother taking a reference here. */ @@ -842,6 +844,7 @@ static void process_ack(struct ceph_connection *con) u32 ack = le32_to_cpu(con->in_partial_ack); u64 seq; + spin_lock(&con->out_queue_lock); while (!list_empty(&con->out_sent)) { m = list_entry(con->out_sent.next, struct ceph_msg, list_head); seq = le64_to_cpu(m->hdr.seq); @@ -852,6 +855,7 @@ static void process_ack(struct ceph_connection *con) list_del_init(&m->list_head); ceph_msg_put(m); } + spin_unlock(&con->out_queue_lock); con->in_tag = CEPH_MSGR_TAG_READY; } @@ -920,6 +924,7 @@ static void reset_connection(struct ceph_connection *con) /* reset connection, out_queue, msg_ and connect_seq */ /* discard existing out_queue and msg_seq */ + spin_lock(&con->out_queue_lock); ceph_msg_put_list(&con->out_queue); ceph_msg_put_list(&con->out_sent); @@ -928,6 +933,7 @@ static void reset_connection(struct ceph_connection *con) con->out_msg = 0; con->in_seq = 0; con->in_msg = 0; + spin_unlock(&con->out_queue_lock); } static void process_connect(struct ceph_connection *con) @@ -1533,7 +1539,7 @@ void ceph_msg_put(struct ceph_msg *m) atomic_read(&m->nref)-1); if (atomic_dec_and_test(&m->nref)) { dout(20, "ceph_msg_put last one on %p\n", m); - BUG_ON(!list_empty(&m->list_head)); + WARN_ON(!list_empty(&m->list_head)); kfree(m->front.iov_base); kfree(m); } -- 2.39.5