]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: allow sent msgs to be revoked from a connection
authorSage Weil <sage@newdream.net>
Wed, 9 Sep 2009 23:46:42 +0000 (16:46 -0700)
committerSage Weil <sage@newdream.net>
Wed, 9 Sep 2009 23:46:42 +0000 (16:46 -0700)
The simplest solution seemed to be to replace the spinlock with a
mutex and carry it for the duration of try_write() (including the
NOWAIT calls to kernel_sendmsg, etc).  If there is contention the
overhead will be a bit higher, but the code is much simpler.

src/TODO
src/include/msgr.h
src/kernel/caps.c
src/kernel/mds_client.c
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/osd_client.c
src/kernel/osd_client.h
src/msg/SimpleMessenger.cc

index 0e029d0b8ec9786d14fe00a26e5b71c13627ba79..b06de2f66d0980420fb1ee03c1352939935b4816 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -38,9 +38,6 @@ v0.15
 
 - msgr
   - check protocols during connection handshake, not per-message?
-- kclient msgr
-  - allow msg to be revoked from connection (per-msg queue_seq, mutex)
-  - and then kill maybe_dup_msg
 
 
 bugs
@@ -91,7 +88,6 @@ repair
 
 kclient
 - fix up ESTALE handling
-- allow struct ceph_msg revoke from a connection
 - don't retry on ENOMEM on non-nofail requests in kick_requests
 - make cap import/export more efficient?
 - flock, fnctl locks
index de85ba1dfb80e540e5ab22a50a10b28b0b68ee19..e80ebfd1c7a34009a722901356445ed23aa928c9 100644 (file)
@@ -21,7 +21,7 @@
  * whenever the wire protocol changes.  try to keep this string length
  * constant.
  */
-#define CEPH_BANNER "ceph v016"
+#define CEPH_BANNER "ceph v017"
 #define CEPH_BANNER_MAX_LEN 30
 
 
@@ -147,11 +147,11 @@ struct ceph_msg_header {
  * follows data payload
  */
 struct ceph_msg_footer {
-       __le32 flags;
        __le32 front_crc, middle_crc, data_crc;
+       __u8 flags;
 } __attribute__ ((packed));
 
-#define CEPH_MSG_FOOTER_ABORTED   (1<<0)   /* drop this message */
+#define CEPH_MSG_FOOTER_COMPLETE  (1<<0)   /* msg wasn't aborted */
 #define CEPH_MSG_FOOTER_NOCRC     (1<<1)   /* no data crc */
 
 
index a6f16d4b0dc0f361365838a0e4ff3d003fe21f96..35aec4fb864235e5673a67b7a9081006a964e8eb 100644 (file)
@@ -2682,6 +2682,10 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
 /*
  * Helpers for embedding cap and dentry lease releases into mds
  * requests.
+ *
+ * @force is used by dentry_release (below) to force inclusion of a
+ * record for the directory inode, even when there aren't any caps to
+ * drop.
  */
 int ceph_encode_inode_release(void **p, struct inode *inode,
                              int mds, int drop, int unless, int force)
index c641eb33425de2435b20c6e0ff7d03c6b8a93437..daa60509d040545dc1fc5c285de0312933ea9ac8 100644 (file)
@@ -661,12 +661,14 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
        while (!list_empty(&session->s_cap_releases)) {
                msg = list_first_entry(&session->s_cap_releases,
                                       struct ceph_msg, list_head);
-               ceph_msg_remove(msg);
+               list_del_init(&msg->list_head);
+               ceph_msg_put(msg);
        }
        while (!list_empty(&session->s_cap_releases_done)) {
                msg = list_first_entry(&session->s_cap_releases_done,
                                       struct ceph_msg, list_head);
-               ceph_msg_remove(msg);
+               list_del_init(&msg->list_head);
+               ceph_msg_put(msg);
        }
        spin_unlock(&session->s_cap_lock);
 }
index 9d69e46fc0ba403dc02f0fc5c6766b6b6cf9734c..99c691aad278300e801be88291cc9ca29a4bd715 100644 (file)
@@ -225,19 +225,34 @@ static int con_close_socket(struct ceph_connection *con)
  * Reset a connection.  Discard all incoming and outgoing messages
  * and clear *_seq state.
  */
+static void ceph_msg_remove(struct ceph_msg *msg)
+{
+       list_del_init(&msg->list_head);
+       ceph_msg_put(msg);
+}
+static void ceph_msg_remove_list(struct list_head *head)
+{
+       while (!list_empty(head)) {
+               struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
+                                                       list_head);
+               ceph_msg_remove(msg);
+       }
+}
+
 static void reset_connection(struct ceph_connection *con)
 {
        /* reset connection, out_queue, msg_ and connect_seq */
        /* discard existing out_queue and msg_seq */
-       spin_lock(&con->out_queue_lock);
-       ceph_msg_put_list(&con->out_queue);
-       ceph_msg_put_list(&con->out_sent);
+       mutex_lock(&con->out_mutex);
+       ceph_msg_remove_list(&con->out_queue);
+       ceph_msg_remove_list(&con->out_sent);
 
        con->connect_seq = 0;
        con->out_seq = 0;
+       con->out_qlen = 0;
        con->out_msg = NULL;
        con->in_seq = 0;
-       spin_unlock(&con->out_queue_lock);
+       mutex_unlock(&con->out_mutex);
 }
 
 /*
@@ -308,7 +323,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
        memset(con, 0, sizeof(*con));
        atomic_set(&con->nref, 1);
        con->msgr = msgr;
-       spin_lock_init(&con->out_queue_lock);
+       mutex_init(&con->out_mutex);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, con_work);
@@ -341,6 +356,7 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
        struct ceph_msg *m = con->out_msg;
 
        dout("prepare_write_message_footer %p\n", con);
+       con->out_kvec_is_msg = true;
        con->out_kvec[v].iov_base = &m->footer;
        con->out_kvec[v].iov_len = sizeof(m->footer);
        con->out_kvec_bytes += sizeof(m->footer);
@@ -358,6 +374,7 @@ static void prepare_write_message(struct ceph_connection *con)
        int v = 0;
 
        con->out_kvec_bytes = 0;
+       con->out_kvec_is_msg = true;
 
        /* Sneak an ack in there first?  If we can get it into the same
         * TCP packet that's a good thing. */
@@ -377,8 +394,13 @@ static void prepare_write_message(struct ceph_connection *con)
        list_move_tail(&m->list_head, &con->out_sent);
        con->out_msg = m;   /* we don't bother taking a reference here. */
 
+       BUG_ON(!con->out_qlen);
+       con->out_qlen--;
+
+       m->hdr.seq = cpu_to_le64(++con->out_seq);
+
        dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
-            m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type),
+            m, con->out_seq, le16_to_cpu(m->hdr.type),
             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
             le32_to_cpu(m->hdr.data_len),
             m->nr_pages);
@@ -401,7 +423,7 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_msg->hdr.crc =
                cpu_to_le32(crc32c(0, (void *)&m->hdr,
                                      sizeof(m->hdr) - sizeof(m->hdr.crc)));
-       con->out_msg->footer.flags = 0;
+       con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
        con->out_msg->footer.front_crc =
                cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
        if (m->middle)
@@ -555,6 +577,7 @@ static int write_partial_kvec(struct ceph_connection *con)
                }
        }
        con->out_kvec_left = 0;
+       con->out_kvec_is_msg = false;
        ret = 1;
 out:
        dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
@@ -590,7 +613,6 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                 * to map the page.  if our pages[] has been revoked, use the
                 * zero page.
                 */
-               mutex_lock(&msg->page_mutex);
                if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
                        if (crc)
@@ -620,7 +642,6 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                if (crc && msg->pages)
                        kunmap(page);
 
-               mutex_unlock(&msg->page_mutex);
                if (ret <= 0)
                        goto out;
 
@@ -637,8 +658,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
        /* prepare and queue up footer, too */
        if (!crc)
-               con->out_msg->footer.flags |=
-                       cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
+               con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
        con->out_kvec_bytes = 0;
        con->out_kvec_left = 0;
        con->out_kvec_cur = con->out_kvec;
@@ -648,7 +668,28 @@ out:
        return ret;
 }
 
+/*
+ * write some zeros
+ */
+static int write_partial_skip(struct ceph_connection *con)
+{
+       int ret;
 
+       while (con->out_skip > 0) {
+               struct kvec iov = {
+                       .iov_base = page_address(con->msgr->zero_page),
+                       .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
+               };
+
+               ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
+               if (ret <= 0)
+                       goto out;
+               con->out_skip -= ret;
+       }
+       ret = 1;
+out:
+       return ret;
+}
 
 /*
  * Prepare to read connection handshake, or an ack.
@@ -896,7 +937,7 @@ static void process_ack(struct ceph_connection *con)
        u32 ack = le32_to_cpu(con->in_temp_ack);
        u64 seq;
 
-       spin_lock(&con->out_queue_lock);
+       mutex_lock(&con->out_mutex);
        while (!list_empty(&con->out_sent)) {
                m = list_first_entry(&con->out_sent, struct ceph_msg,
                                     list_head);
@@ -907,7 +948,7 @@ static void process_ack(struct ceph_connection *con)
                     le16_to_cpu(m->hdr.type), m);
                ceph_msg_remove(m);
        }
-       spin_unlock(&con->out_queue_lock);
+       mutex_unlock(&con->out_mutex);
        prepare_read_tag(con);
 }
 
@@ -1056,17 +1097,7 @@ static int read_partial_message(struct ceph_connection *con)
        while (con->in_msg_pos.data_pos < data_len) {
                left = min((int)(data_len - con->in_msg_pos.data_pos),
                           (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
-               mutex_lock(&m->page_mutex);
-               if (!m->pages) {
-                       dout("%p pages revoked during msg read\n", m);
-                       mutex_unlock(&m->page_mutex);
-                       con->in_base_pos = con->in_msg_pos.data_pos
-                               - data_len - sizeof(m->footer);
-                       ceph_msg_put(m);
-                       con->in_msg = NULL;
-                       con->in_tag = CEPH_MSGR_TAG_READY;
-                       return 0;
-               }
+               BUG_ON(m->pages == NULL);
                p = kmap(m->pages[con->in_msg_pos.page]);
                ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
                                       left);
@@ -1075,7 +1106,6 @@ static int read_partial_message(struct ceph_connection *con)
                                crc32c(con->in_data_crc,
                                          p + con->in_msg_pos.page_pos, ret);
                kunmap(m->pages[con->in_msg_pos.page]);
-               mutex_unlock(&m->page_mutex);
                if (ret <= 0)
                        return ret;
                con->in_msg_pos.data_pos += ret;
@@ -1114,7 +1144,7 @@ no_data:
                return -EBADMSG;
        }
        if (datacrc &&
-           (le32_to_cpu(m->footer.flags) & CEPH_MSG_FOOTER_NOCRC) == 0 &&
+           (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
            con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
                pr_err("ceph read_partial_message %p data crc %u != exp. %u\n",
                       m,
@@ -1140,9 +1170,9 @@ static void process_message(struct ceph_connection *con)
        if (con->peer_name.type == 0)
                con->peer_name = msg->hdr.src.name;
 
-       spin_lock(&con->out_queue_lock);
+       mutex_lock(&con->out_mutex);
        con->in_seq++;
-       spin_unlock(&con->out_queue_lock);
+       mutex_unlock(&con->out_mutex);
 
        dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n",
             msg, le64_to_cpu(msg->hdr.seq),
@@ -1157,12 +1187,6 @@ static void process_message(struct ceph_connection *con)
 }
 
 
-
-
-
-
-
-
 /*
  * Write something to the socket.  Called in a worker thread when the
  * socket appears to be writeable and we have something ready to send.
@@ -1175,6 +1199,7 @@ static int try_write(struct ceph_connection *con)
        dout("try_write start %p state %lu nref %d\n", con, con->state,
             atomic_read(&con->nref));
 
+       mutex_lock(&con->out_mutex);
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
@@ -1206,6 +1231,15 @@ more:
 
 more_kvec:
        /* kvec data queued? */
+       if (con->out_skip) {
+               ret = write_partial_skip(con);
+               if (ret <= 0)
+                       goto done;
+               if (ret < 0) {
+                       dout("try_write write_partial_skip err %d\n", ret);
+                       goto done;
+               }
+       }
        if (con->out_kvec_left) {
                ret = write_partial_kvec(con);
                if (ret <= 0)
@@ -1232,23 +1266,18 @@ more_kvec:
 
        if (!test_bit(CONNECTING, &con->state)) {
                /* is anything else pending? */
-               spin_lock(&con->out_queue_lock);
                if (!list_empty(&con->out_queue)) {
                        prepare_write_message(con);
-                       spin_unlock(&con->out_queue_lock);
                        goto more;
                }
                if (con->in_seq > con->in_seq_acked) {
                        prepare_write_ack(con);
-                       spin_unlock(&con->out_queue_lock);
                        goto more;
                }
                if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
                        prepare_write_keepalive(con);
-                       spin_unlock(&con->out_queue_lock);
                        goto more;
                }
-               spin_unlock(&con->out_queue_lock);
        }
 
        /* Nothing to do! */
@@ -1257,6 +1286,7 @@ more_kvec:
 done:
        ret = 0;
 out:
+       mutex_unlock(&con->out_mutex);
        dout("try_write done on %p\n", con);
        return ret;
 }
@@ -1492,18 +1522,18 @@ static void ceph_fault(struct ceph_connection *con)
 
        /* If there are no messages in the queue, place the connection
         * in a STANDBY state (i.e., don't try to reconnect just yet). */
-       spin_lock(&con->out_queue_lock);
+       mutex_lock(&con->out_mutex);
        if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
                dout("fault setting STANDBY\n");
                set_bit(STANDBY, &con->state);
-               spin_unlock(&con->out_queue_lock);
+               mutex_unlock(&con->out_mutex);
                goto out;
        }
 
        /* Requeue anything that hasn't been acked, and retry after a
         * delay. */
        list_splice_init(&con->out_sent, &con->out_queue);
-       spin_unlock(&con->out_queue_lock);
+       mutex_unlock(&con->out_mutex);
 
        if (con->delay == 0)
                con->delay = BASE_DELAY_INTERVAL;
@@ -1571,41 +1601,6 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
        dout("destroyed messenger %p\n", msgr);
 }
 
-/*
- * A single ceph_msg can't be queued for send twice, unless it's
- * already been delivered (i.e. we have the only remaining reference),
- * because of the list_head indicating which queue it is on.
- *
- * So, we dup the message if there is more than once reference.  If it has
- * pages (a data payload), steal the pages away from the old message.
- */
-struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
-{
-       struct ceph_msg *dup;
-
-       if (atomic_read(&old->nref) == 1)
-               return old;  /* we have only ref, all is well */
-
-       dup = ceph_msg_new(le16_to_cpu(old->hdr.type),
-                          le32_to_cpu(old->hdr.front_len),
-                          le32_to_cpu(old->hdr.data_len),
-                          le16_to_cpu(old->hdr.data_off),
-                          old->pages);
-       if (!dup)
-               return ERR_PTR(-ENOMEM);
-       memcpy(dup->front.iov_base, old->front.iov_base,
-              le32_to_cpu(old->hdr.front_len));
-
-       /* revoke old message's pages */
-       mutex_lock(&old->page_mutex);
-       old->pages = NULL;
-       old->footer.flags |= cpu_to_le32(CEPH_MSG_FOOTER_ABORTED);
-       mutex_unlock(&old->page_mutex);
-
-       ceph_msg_put(old);
-       return dup;
-}
-
 /*
  * Queue up an outgoing message on the given connection.
  */
@@ -1623,20 +1618,17 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        msg->hdr.dst_erank = con->peer_addr.erank;
 
        /* queue */
-       spin_lock(&con->out_queue_lock);
-       msg->hdr.seq = cpu_to_le64(++con->out_seq);
-       dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg,
-            (unsigned)con->out_seq,
+       mutex_lock(&con->out_mutex);
+       list_add_tail(&msg->list_head, &con->out_queue);
+       con->out_qlen++;
+       dout("----- %p %llu to %s%d %d=%s len %d+%d+%d -----\n", msg,
+            con->out_seq + con->out_qlen,
             ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
             le32_to_cpu(msg->hdr.front_len),
             le32_to_cpu(msg->hdr.middle_len),
             le32_to_cpu(msg->hdr.data_len));
-       dout("ceph_con_send %p %s%d %p seq %llu pgs %d\n",
-            con, ENTITY_NAME(con->peer_name), msg,
-            le64_to_cpu(msg->hdr.seq), msg->nr_pages);
-       list_add_tail(&msg->list_head, &con->out_queue);
-       spin_unlock(&con->out_queue_lock);
+       mutex_unlock(&con->out_mutex);
 
        /* if there wasn't anything waiting to send before, queue
         * new work */
@@ -1644,6 +1636,32 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
                queue_con(con);
 }
 
+/*
+ * Revoke a message that was previously queued for send
+ */
+void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       mutex_lock(&con->out_mutex);
+       if (!list_empty(&msg->list_head)) {
+               dout("con_revoke %p msg %p\n", con, msg);
+               list_del_init(&msg->list_head);
+               ceph_msg_put(msg);
+               if (msg->hdr.seq == 0)
+                       con->out_qlen--;
+               else
+                       msg->hdr.seq = 0;
+               if (con->out_msg == msg)
+                       con->out_msg = NULL;
+               if (con->out_kvec_is_msg) {
+                       con->out_skip = con->out_kvec_bytes;
+                       con->out_kvec_is_msg = false;
+               }
+       } else {
+               dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
+       }
+       mutex_unlock(&con->out_mutex);
+}
+
 /*
  * Queue a keepalive byte to ensure the tcp connection is alive.
  */
@@ -1668,7 +1686,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        if (m == NULL)
                goto out;
        atomic_set(&m->nref, 1);
-       mutex_init(&m->page_mutex);
        INIT_LIST_HEAD(&m->list_head);
 
        m->hdr.type = cpu_to_le16(type);
index 5a965add550f5397b40f289f0b88d74da28b40a5..3550dbea2434464fa1750a9363a9e8f342f153e0 100644 (file)
@@ -85,15 +85,13 @@ struct ceph_messenger {
 /*
  * a single message.  it contains a header (src, dest, message type, etc.),
  * footer (crc values, mainly), a "front" message body, and possibly a
- * data payload (stored in some number of pages).  The page_mutex protects
- * access to the page vector.
+ * data payload (stored in some number of pages).
  */
 struct ceph_msg {
        struct ceph_msg_header hdr;     /* header */
        struct ceph_msg_footer footer;  /* footer */
        struct kvec front;              /* unaligned blobs of message */
        struct ceph_buffer *middle;
-       struct mutex page_mutex;
        struct page **pages;            /* data payload.  NOT OWNER. */
        unsigned nr_pages;              /* size of page array */
        struct list_head list_head;
@@ -161,10 +159,12 @@ struct ceph_connection {
        u32 peer_global_seq;  /* peer's global seq for this connection */
 
        /* out queue */
-       spinlock_t out_queue_lock;   /* protects out_queue, out_sent, out_seq */
+       struct mutex out_mutex;
        struct list_head out_queue;
-       struct list_head out_sent;   /* sending/sent but unacked */
-       u32 out_seq;                 /* last message queued for send */
+       struct list_head out_sent;   /* sending or sent but unacked */
+       unsigned out_qlen;
+       u64 out_seq;                 /* last message queued for send */
+       u64 out_seq_sent;            /* last message sent */
        bool out_keepalive_pending;
 
        u32 in_seq, in_seq_acked;  /* last message received, acked */
@@ -188,10 +188,12 @@ struct ceph_connection {
                                            out_sent) */
        struct ceph_msg_pos out_msg_pos;
 
-       struct kvec out_kvec[6],         /* sending header/footer data */
+       struct kvec out_kvec[8],         /* sending header/footer data */
                *out_kvec_cur;
        int out_kvec_left;   /* kvec's left in out_kvec */
+       int out_skip;        /* skip this many bytes */
        int out_kvec_bytes;  /* total bytes left */
+       bool out_kvec_is_msg; /* kvec refers to out_msg */
        int out_more;        /* there is more data after the kvecs */
        __le32 out_temp_ack; /* for writing an ack */
 
@@ -223,6 +225,7 @@ extern void ceph_con_open(struct ceph_connection *con,
                          struct ceph_entity_addr *addr);
 extern void ceph_con_close(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
+extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
 extern void ceph_con_keepalive(struct ceph_connection *con);
 extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
 extern void ceph_con_put(struct ceph_connection *con);
@@ -239,25 +242,11 @@ extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
 
 static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
 {
+       dout("ceph_msg_get %p %d -> %d\n", msg, atomic_read(&msg->nref),
+            atomic_read(&msg->nref)+1);
        atomic_inc(&msg->nref);
        return msg;
 }
 extern void ceph_msg_put(struct ceph_msg *msg);
 
-static inline void ceph_msg_remove(struct ceph_msg *msg)
-{
-       list_del_init(&msg->list_head);
-       ceph_msg_put(msg);
-}
-static inline void ceph_msg_put_list(struct list_head *head)
-{
-       while (!list_empty(head)) {
-               struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
-                                                       list_head);
-               ceph_msg_remove(msg);
-       }
-}
-
-extern struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *msg);
-
 #endif
index 6366dcfe836daa8b6b87a041b37b9f57e82a0a25..cf065b6110292e446a69955a1d6aabfbc0d97e6e 100644 (file)
@@ -478,6 +478,16 @@ static void __unregister_request(struct ceph_osd_client *osdc,
        }
 }
 
+/*
+ * Cancel a previously queued request message
+ */
+static void __cancel_request(struct ceph_osd_request *req)
+{
+       if (req->r_sent) {
+               ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+               req->r_sent = false;
+       }
+}
 
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
@@ -497,6 +507,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
        int err;
        struct ceph_osd *newosd = NULL;
 
+       dout("map_osds %p tid %lld\n", req, req->r_tid);
        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
                                      &req->r_file_layout, osdc->osdmap);
        if (err)
@@ -513,6 +524,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
             req->r_osd ? req->r_osd->o_osd : -1);
 
        if (req->r_osd) {
+               __cancel_request(req);
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests)) {
                        /* try to re-use r_osd if possible */
@@ -555,8 +567,8 @@ out:
 /*
  * caller should hold map_sem (for read) and request_mutex
  */
-static int send_request(struct ceph_osd_client *osdc,
-                       struct ceph_osd_request *req)
+static int __send_request(struct ceph_osd_client *osdc,
+                         struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead;
        int err;
@@ -582,6 +594,7 @@ static int send_request(struct ceph_osd_client *osdc,
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
+       req->r_sent = true;
        return 0;
 }
 
@@ -617,7 +630,7 @@ static void handle_timeout(struct work_struct *work)
                        int err;
 
                        dout("osdc resending prev failed %lld\n", req->r_tid);
-                       err = send_request(osdc, req);
+                       err = __send_request(osdc, req);
                        if (err)
                                dout("osdc failed again on %lld\n", req->r_tid);
                        else
@@ -691,10 +704,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                req->r_reply = NULL;
        }
 
-       if (req->r_aborted) {
-               dout("handle_reply tid %llu aborted\n", tid);
-               goto done;
-       }
        if (!req->r_got_reply) {
                unsigned bytes;
 
@@ -812,19 +821,12 @@ static void kick_requests(struct ceph_osd_client *osdc,
 
        kick:
                dout("kicking tid %llu osd%d\n", req->r_tid, req->r_osd->o_osd);
-               ceph_osdc_get_request(req);
-               mutex_unlock(&osdc->request_mutex);
-               req->r_request = ceph_msg_maybe_dup(req->r_request);
-               if (!req->r_aborted) {
-                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
-                       err = send_request(osdc, req);
-                       if (err) {
-                               dout(" setting r_resend on %llu\n", req->r_tid);
-                               req->r_resend = true;
-                       }
+               req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               err = __send_request(osdc, req);
+               if (err) {
+                       dout(" setting r_resend on %llu\n", req->r_tid);
+                       req->r_resend = true;
                }
-               ceph_osdc_put_request(req);
-               mutex_lock(&osdc->request_mutex);
        }
        mutex_unlock(&osdc->request_mutex);
 
@@ -977,8 +979,7 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
        }
        dout("prepare_pages tid %llu has %d pages, want %d\n",
             tid, req->r_num_pages, want);
-       if (likely(req->r_num_pages >= want && !req->r_prepared_pages &&
-                  !req->r_aborted)) {
+       if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
                m->pages = req->r_pages;
                m->nr_pages = req->r_num_pages;
                req->r_reply = m;  /* only for duration of read over socket */
@@ -1007,7 +1008,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 
        down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
-       rc = send_request(osdc, req);
+       rc = __send_request(osdc, req);
        if (rc) {
                if (nofail) {
                        dout("osdc_start_request failed send, marking %lld\n",
@@ -1033,7 +1034,10 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
 
        rc = wait_for_completion_interruptible(&req->r_completion);
        if (rc < 0) {
-               ceph_osdc_abort_request(osdc, req);
+               mutex_lock(&osdc->request_mutex);
+               __cancel_request(req);
+               mutex_unlock(&osdc->request_mutex);
+               dout("wait_request tid %llu timed out\n", req->r_tid);
                return rc;
        }
 
@@ -1041,37 +1045,6 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
        return req->r_result;
 }
 
-/*
- * To abort an in-progress request, take pages away from outgoing or
- * incoming message.
- */
-void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
-                            struct ceph_osd_request *req)
-{
-       struct ceph_msg *msg;
-
-       pr_err("abort_request tid %llu, revoking %p pages\n", req->r_tid,
-            req->r_request);
-       /*
-        * mark req aborted _before_ revoking pages, so that
-        * if a racing kick_request _does_ dup the page vec
-        * pointer, it will definitely then see the aborted
-        * flag and not send the request.
-        */
-       req->r_aborted = 1;
-       msg = req->r_request;
-       mutex_lock(&msg->page_mutex);
-       msg->pages = NULL;
-       mutex_unlock(&msg->page_mutex);
-       if (req->r_reply) {
-               mutex_lock(&req->r_reply->page_mutex);
-               req->r_reply->pages = NULL;
-               mutex_unlock(&req->r_reply->page_mutex);
-               ceph_msg_put(req->r_reply);
-               req->r_reply = NULL;
-       }
-}
-
 /*
  * sync - wait for all in-flight requests to flush.  avoid starvation.
  */
index b3e574c557539fed1d24bf65a8a9516ecf305b6b..36a5e4be6cf633c2e44c1d4eb0dbc415e0ab2a7e 100644 (file)
@@ -40,7 +40,7 @@ struct ceph_osd_request {
        struct ceph_msg  *r_request, *r_reply;
        int               r_result;
        int               r_flags;     /* any additional flags for the osd */
-       int               r_aborted;   /* set if we cancel this request */
+       bool              r_sent;      /* true if r_request is sending/sent */
        int r_prepared_pages, r_got_reply;
 
        struct ceph_osd_client *r_osdc;
@@ -117,8 +117,6 @@ extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
                                   bool nofail);
 extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
                                  struct ceph_osd_request *req);
-extern void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
-                                   struct ceph_osd_request *req);
 extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
 
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
index 4bee029844033dfb3251c07032ffe7f1e4525f86..644b2bd41d2a386d1d58b74f6fdca6ec243aff5d 100644 (file)
@@ -1598,7 +1598,7 @@ Message *SimpleMessenger::Pipe::read_message()
   if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
     return 0;
   
-  int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
+  int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
   dout(10) << "aborted = " << aborted << dendl;
   if (aborted) {
     dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
@@ -1750,7 +1750,7 @@ int SimpleMessenger::Pipe::write_message(Message *m)
   header.front_len = m->get_payload().length();
   header.middle_len = m->get_middle().length();
   header.data_len = m->get_data().length();
-  footer.flags = 0;
+  footer.flags = CEPH_MSG_FOOTER_COMPLETE;
   m->calc_header_crc();
 
   bufferlist blist = m->get_payload();