From: Sage Weil Date: Wed, 9 Sep 2009 23:46:42 +0000 (-0700) Subject: kclient: allow sent msgs to be revoked from a connection X-Git-Tag: v0.15~91 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5f3b04698244ed7255b0ae95f068249c1691c38f;p=ceph.git kclient: allow sent msgs to be revoked from a connection The simplest solution seemed to be to replace the spinlock with a mutex and carry it for the duration of try_write() (including the NOWAIT calls to kernel_sendmsg, etc). If there is contention the overhead will be a bit higher, but the code is much simpler. --- diff --git a/src/TODO b/src/TODO index 0e029d0b8ec..b06de2f66d0 100644 --- a/src/TODO +++ b/src/TODO @@ -38,9 +38,6 @@ v0.15 - msgr - check protocols during connection handshake, not per-message? -- kclient msgr - - allow msg to be revoked from connection (per-msg queue_seq, mutex) - - and then kill maybe_dup_msg bugs @@ -91,7 +88,6 @@ repair kclient - fix up ESTALE handling -- allow struct ceph_msg revoke from a connection - don't retry on ENOMEM on non-nofail requests in kick_requests - make cap import/export more efficient? - flock, fnctl locks diff --git a/src/include/msgr.h b/src/include/msgr.h index de85ba1dfb8..e80ebfd1c7a 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -21,7 +21,7 @@ * whenever the wire protocol changes. try to keep this string length * constant. */ -#define CEPH_BANNER "ceph v016" +#define CEPH_BANNER "ceph v017" #define CEPH_BANNER_MAX_LEN 30 @@ -147,11 +147,11 @@ struct ceph_msg_header { * follows data payload */ struct ceph_msg_footer { - __le32 flags; __le32 front_crc, middle_crc, data_crc; + __u8 flags; } __attribute__ ((packed)); -#define CEPH_MSG_FOOTER_ABORTED (1<<0) /* drop this message */ +#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */ #define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */ diff --git a/src/kernel/caps.c b/src/kernel/caps.c index a6f16d4b0dc..35aec4fb864 100644 --- a/src/kernel/caps.c +++ b/src/kernel/caps.c @@ -2682,6 +2682,10 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) /* * Helpers for embedding cap and dentry lease releases into mds * requests. + * + * @force is used by dentry_release (below) to force inclusion of a + * record for the directory inode, even when there aren't any caps to + * drop. */ int ceph_encode_inode_release(void **p, struct inode *inode, int mds, int drop, int unless, int force) diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index c641eb33425..daa60509d04 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -661,12 +661,14 @@ static void cleanup_cap_releases(struct ceph_mds_session *session) while (!list_empty(&session->s_cap_releases)) { msg = list_first_entry(&session->s_cap_releases, struct ceph_msg, list_head); - ceph_msg_remove(msg); + list_del_init(&msg->list_head); + ceph_msg_put(msg); } while (!list_empty(&session->s_cap_releases_done)) { msg = list_first_entry(&session->s_cap_releases_done, struct ceph_msg, list_head); - ceph_msg_remove(msg); + list_del_init(&msg->list_head); + ceph_msg_put(msg); } spin_unlock(&session->s_cap_lock); } diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 9d69e46fc0b..99c691aad27 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -225,19 +225,34 @@ static int con_close_socket(struct ceph_connection *con) * Reset a connection. Discard all incoming and outgoing messages * and clear *_seq state. */ +static void ceph_msg_remove(struct ceph_msg *msg) +{ + list_del_init(&msg->list_head); + ceph_msg_put(msg); +} +static void ceph_msg_remove_list(struct list_head *head) +{ + while (!list_empty(head)) { + struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, + list_head); + ceph_msg_remove(msg); + } +} + static void reset_connection(struct ceph_connection *con) { /* reset connection, out_queue, msg_ and connect_seq */ /* discard existing out_queue and msg_seq */ - spin_lock(&con->out_queue_lock); - ceph_msg_put_list(&con->out_queue); - ceph_msg_put_list(&con->out_sent); + mutex_lock(&con->out_mutex); + ceph_msg_remove_list(&con->out_queue); + ceph_msg_remove_list(&con->out_sent); con->connect_seq = 0; con->out_seq = 0; + con->out_qlen = 0; con->out_msg = NULL; con->in_seq = 0; - spin_unlock(&con->out_queue_lock); + mutex_unlock(&con->out_mutex); } /* @@ -308,7 +323,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) memset(con, 0, sizeof(*con)); atomic_set(&con->nref, 1); con->msgr = msgr; - spin_lock_init(&con->out_queue_lock); + mutex_init(&con->out_mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); @@ -341,6 +356,7 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v) struct ceph_msg *m = con->out_msg; dout("prepare_write_message_footer %p\n", con); + con->out_kvec_is_msg = true; con->out_kvec[v].iov_base = &m->footer; con->out_kvec[v].iov_len = sizeof(m->footer); con->out_kvec_bytes += sizeof(m->footer); @@ -358,6 +374,7 @@ static void prepare_write_message(struct ceph_connection *con) int v = 0; con->out_kvec_bytes = 0; + con->out_kvec_is_msg = true; /* Sneak an ack in there first? If we can get it into the same * TCP packet that's a good thing. */ @@ -377,8 +394,13 @@ static void prepare_write_message(struct ceph_connection *con) list_move_tail(&m->list_head, &con->out_sent); con->out_msg = m; /* we don't bother taking a reference here. */ + BUG_ON(!con->out_qlen); + con->out_qlen--; + + m->hdr.seq = cpu_to_le64(++con->out_seq); + dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", - m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type), + m, con->out_seq, le16_to_cpu(m->hdr.type), le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), le32_to_cpu(m->hdr.data_len), m->nr_pages); @@ -401,7 +423,7 @@ static void prepare_write_message(struct ceph_connection *con) con->out_msg->hdr.crc = cpu_to_le32(crc32c(0, (void *)&m->hdr, sizeof(m->hdr) - sizeof(m->hdr.crc))); - con->out_msg->footer.flags = 0; + con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; con->out_msg->footer.front_crc = cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); if (m->middle) @@ -555,6 +577,7 @@ static int write_partial_kvec(struct ceph_connection *con) } } con->out_kvec_left = 0; + con->out_kvec_is_msg = false; ret = 1; out: dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, @@ -590,7 +613,6 @@ static int write_partial_msg_pages(struct ceph_connection *con) * to map the page. if our pages[] has been revoked, use the * zero page. */ - mutex_lock(&msg->page_mutex); if (msg->pages) { page = msg->pages[con->out_msg_pos.page]; if (crc) @@ -620,7 +642,6 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (crc && msg->pages) kunmap(page); - mutex_unlock(&msg->page_mutex); if (ret <= 0) goto out; @@ -637,8 +658,7 @@ static int write_partial_msg_pages(struct ceph_connection *con) /* prepare and queue up footer, too */ if (!crc) - con->out_msg->footer.flags |= - cpu_to_le32(CEPH_MSG_FOOTER_NOCRC); + con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; con->out_kvec_bytes = 0; con->out_kvec_left = 0; con->out_kvec_cur = con->out_kvec; @@ -648,7 +668,28 @@ out: return ret; } +/* + * write some zeros + */ +static int write_partial_skip(struct ceph_connection *con) +{ + int ret; + while (con->out_skip > 0) { + struct kvec iov = { + .iov_base = page_address(con->msgr->zero_page), + .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE) + }; + + ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); + if (ret <= 0) + goto out; + con->out_skip -= ret; + } + ret = 1; +out: + return ret; +} /* * Prepare to read connection handshake, or an ack. @@ -896,7 +937,7 @@ static void process_ack(struct ceph_connection *con) u32 ack = le32_to_cpu(con->in_temp_ack); u64 seq; - spin_lock(&con->out_queue_lock); + mutex_lock(&con->out_mutex); while (!list_empty(&con->out_sent)) { m = list_first_entry(&con->out_sent, struct ceph_msg, list_head); @@ -907,7 +948,7 @@ static void process_ack(struct ceph_connection *con) le16_to_cpu(m->hdr.type), m); ceph_msg_remove(m); } - spin_unlock(&con->out_queue_lock); + mutex_unlock(&con->out_mutex); prepare_read_tag(con); } @@ -1056,17 +1097,7 @@ static int read_partial_message(struct ceph_connection *con) while (con->in_msg_pos.data_pos < data_len) { left = min((int)(data_len - con->in_msg_pos.data_pos), (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); - mutex_lock(&m->page_mutex); - if (!m->pages) { - dout("%p pages revoked during msg read\n", m); - mutex_unlock(&m->page_mutex); - con->in_base_pos = con->in_msg_pos.data_pos - - data_len - sizeof(m->footer); - ceph_msg_put(m); - con->in_msg = NULL; - con->in_tag = CEPH_MSGR_TAG_READY; - return 0; - } + BUG_ON(m->pages == NULL); p = kmap(m->pages[con->in_msg_pos.page]); ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left); @@ -1075,7 +1106,6 @@ static int read_partial_message(struct ceph_connection *con) crc32c(con->in_data_crc, p + con->in_msg_pos.page_pos, ret); kunmap(m->pages[con->in_msg_pos.page]); - mutex_unlock(&m->page_mutex); if (ret <= 0) return ret; con->in_msg_pos.data_pos += ret; @@ -1114,7 +1144,7 @@ no_data: return -EBADMSG; } if (datacrc && - (le32_to_cpu(m->footer.flags) & CEPH_MSG_FOOTER_NOCRC) == 0 && + (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { pr_err("ceph read_partial_message %p data crc %u != exp. %u\n", m, @@ -1140,9 +1170,9 @@ static void process_message(struct ceph_connection *con) if (con->peer_name.type == 0) con->peer_name = msg->hdr.src.name; - spin_lock(&con->out_queue_lock); + mutex_lock(&con->out_mutex); con->in_seq++; - spin_unlock(&con->out_queue_lock); + mutex_unlock(&con->out_mutex); dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n", msg, le64_to_cpu(msg->hdr.seq), @@ -1157,12 +1187,6 @@ static void process_message(struct ceph_connection *con) } - - - - - - /* * Write something to the socket. Called in a worker thread when the * socket appears to be writeable and we have something ready to send. @@ -1175,6 +1199,7 @@ static int try_write(struct ceph_connection *con) dout("try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref)); + mutex_lock(&con->out_mutex); more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); @@ -1206,6 +1231,15 @@ more: more_kvec: /* kvec data queued? */ + if (con->out_skip) { + ret = write_partial_skip(con); + if (ret <= 0) + goto done; + if (ret < 0) { + dout("try_write write_partial_skip err %d\n", ret); + goto done; + } + } if (con->out_kvec_left) { ret = write_partial_kvec(con); if (ret <= 0) @@ -1232,23 +1266,18 @@ more_kvec: if (!test_bit(CONNECTING, &con->state)) { /* is anything else pending? */ - spin_lock(&con->out_queue_lock); if (!list_empty(&con->out_queue)) { prepare_write_message(con); - spin_unlock(&con->out_queue_lock); goto more; } if (con->in_seq > con->in_seq_acked) { prepare_write_ack(con); - spin_unlock(&con->out_queue_lock); goto more; } if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { prepare_write_keepalive(con); - spin_unlock(&con->out_queue_lock); goto more; } - spin_unlock(&con->out_queue_lock); } /* Nothing to do! */ @@ -1257,6 +1286,7 @@ more_kvec: done: ret = 0; out: + mutex_unlock(&con->out_mutex); dout("try_write done on %p\n", con); return ret; } @@ -1492,18 +1522,18 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages in the queue, place the connection * in a STANDBY state (i.e., don't try to reconnect just yet). */ - spin_lock(&con->out_queue_lock); + mutex_lock(&con->out_mutex); if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { dout("fault setting STANDBY\n"); set_bit(STANDBY, &con->state); - spin_unlock(&con->out_queue_lock); + mutex_unlock(&con->out_mutex); goto out; } /* Requeue anything that hasn't been acked, and retry after a * delay. */ list_splice_init(&con->out_sent, &con->out_queue); - spin_unlock(&con->out_queue_lock); + mutex_unlock(&con->out_mutex); if (con->delay == 0) con->delay = BASE_DELAY_INTERVAL; @@ -1571,41 +1601,6 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) dout("destroyed messenger %p\n", msgr); } -/* - * A single ceph_msg can't be queued for send twice, unless it's - * already been delivered (i.e. we have the only remaining reference), - * because of the list_head indicating which queue it is on. - * - * So, we dup the message if there is more than once reference. If it has - * pages (a data payload), steal the pages away from the old message. - */ -struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old) -{ - struct ceph_msg *dup; - - if (atomic_read(&old->nref) == 1) - return old; /* we have only ref, all is well */ - - dup = ceph_msg_new(le16_to_cpu(old->hdr.type), - le32_to_cpu(old->hdr.front_len), - le32_to_cpu(old->hdr.data_len), - le16_to_cpu(old->hdr.data_off), - old->pages); - if (!dup) - return ERR_PTR(-ENOMEM); - memcpy(dup->front.iov_base, old->front.iov_base, - le32_to_cpu(old->hdr.front_len)); - - /* revoke old message's pages */ - mutex_lock(&old->page_mutex); - old->pages = NULL; - old->footer.flags |= cpu_to_le32(CEPH_MSG_FOOTER_ABORTED); - mutex_unlock(&old->page_mutex); - - ceph_msg_put(old); - return dup; -} - /* * Queue up an outgoing message on the given connection. */ @@ -1623,20 +1618,17 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) msg->hdr.dst_erank = con->peer_addr.erank; /* queue */ - spin_lock(&con->out_queue_lock); - msg->hdr.seq = cpu_to_le64(++con->out_seq); - dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg, - (unsigned)con->out_seq, + mutex_lock(&con->out_mutex); + list_add_tail(&msg->list_head, &con->out_queue); + con->out_qlen++; + dout("----- %p %llu to %s%d %d=%s len %d+%d+%d -----\n", msg, + con->out_seq + con->out_qlen, ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), le32_to_cpu(msg->hdr.front_len), le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len)); - dout("ceph_con_send %p %s%d %p seq %llu pgs %d\n", - con, ENTITY_NAME(con->peer_name), msg, - le64_to_cpu(msg->hdr.seq), msg->nr_pages); - list_add_tail(&msg->list_head, &con->out_queue); - spin_unlock(&con->out_queue_lock); + mutex_unlock(&con->out_mutex); /* if there wasn't anything waiting to send before, queue * new work */ @@ -1644,6 +1636,32 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) queue_con(con); } +/* + * Revoke a message that was previously queued for send + */ +void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) +{ + mutex_lock(&con->out_mutex); + if (!list_empty(&msg->list_head)) { + dout("con_revoke %p msg %p\n", con, msg); + list_del_init(&msg->list_head); + ceph_msg_put(msg); + if (msg->hdr.seq == 0) + con->out_qlen--; + else + msg->hdr.seq = 0; + if (con->out_msg == msg) + con->out_msg = NULL; + if (con->out_kvec_is_msg) { + con->out_skip = con->out_kvec_bytes; + con->out_kvec_is_msg = false; + } + } else { + dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg); + } + mutex_unlock(&con->out_mutex); +} + /* * Queue a keepalive byte to ensure the tcp connection is alive. */ @@ -1668,7 +1686,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, if (m == NULL) goto out; atomic_set(&m->nref, 1); - mutex_init(&m->page_mutex); INIT_LIST_HEAD(&m->list_head); m->hdr.type = cpu_to_le16(type); diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 5a965add550..3550dbea243 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -85,15 +85,13 @@ struct ceph_messenger { /* * a single message. it contains a header (src, dest, message type, etc.), * footer (crc values, mainly), a "front" message body, and possibly a - * data payload (stored in some number of pages). The page_mutex protects - * access to the page vector. + * data payload (stored in some number of pages). */ struct ceph_msg { struct ceph_msg_header hdr; /* header */ struct ceph_msg_footer footer; /* footer */ struct kvec front; /* unaligned blobs of message */ struct ceph_buffer *middle; - struct mutex page_mutex; struct page **pages; /* data payload. NOT OWNER. */ unsigned nr_pages; /* size of page array */ struct list_head list_head; @@ -161,10 +159,12 @@ struct ceph_connection { u32 peer_global_seq; /* peer's global seq for this connection */ /* out queue */ - spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */ + struct mutex out_mutex; struct list_head out_queue; - struct list_head out_sent; /* sending/sent but unacked */ - u32 out_seq; /* last message queued for send */ + struct list_head out_sent; /* sending or sent but unacked */ + unsigned out_qlen; + u64 out_seq; /* last message queued for send */ + u64 out_seq_sent; /* last message sent */ bool out_keepalive_pending; u32 in_seq, in_seq_acked; /* last message received, acked */ @@ -188,10 +188,12 @@ struct ceph_connection { out_sent) */ struct ceph_msg_pos out_msg_pos; - struct kvec out_kvec[6], /* sending header/footer data */ + struct kvec out_kvec[8], /* sending header/footer data */ *out_kvec_cur; int out_kvec_left; /* kvec's left in out_kvec */ + int out_skip; /* skip this many bytes */ int out_kvec_bytes; /* total bytes left */ + bool out_kvec_is_msg; /* kvec refers to out_msg */ int out_more; /* there is more data after the kvecs */ __le32 out_temp_ack; /* for writing an ack */ @@ -223,6 +225,7 @@ extern void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr); extern void ceph_con_close(struct ceph_connection *con); extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); +extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); extern void ceph_con_keepalive(struct ceph_connection *con); extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); extern void ceph_con_put(struct ceph_connection *con); @@ -239,25 +242,11 @@ extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg); static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) { + dout("ceph_msg_get %p %d -> %d\n", msg, atomic_read(&msg->nref), + atomic_read(&msg->nref)+1); atomic_inc(&msg->nref); return msg; } extern void ceph_msg_put(struct ceph_msg *msg); -static inline void ceph_msg_remove(struct ceph_msg *msg) -{ - list_del_init(&msg->list_head); - ceph_msg_put(msg); -} -static inline void ceph_msg_put_list(struct list_head *head) -{ - while (!list_empty(head)) { - struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, - list_head); - ceph_msg_remove(msg); - } -} - -extern struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *msg); - #endif diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 6366dcfe836..cf065b61102 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -478,6 +478,16 @@ static void __unregister_request(struct ceph_osd_client *osdc, } } +/* + * Cancel a previously queued request message + */ +static void __cancel_request(struct ceph_osd_request *req) +{ + if (req->r_sent) { + ceph_con_revoke(&req->r_osd->o_con, req->r_request); + req->r_sent = false; + } +} /* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct @@ -497,6 +507,7 @@ static int __map_osds(struct ceph_osd_client *osdc, int err; struct ceph_osd *newosd = NULL; + dout("map_osds %p tid %lld\n", req, req->r_tid); err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, &req->r_file_layout, osdc->osdmap); if (err) @@ -513,6 +524,7 @@ static int __map_osds(struct ceph_osd_client *osdc, req->r_osd ? req->r_osd->o_osd : -1); if (req->r_osd) { + __cancel_request(req); list_del_init(&req->r_osd_item); if (list_empty(&req->r_osd->o_requests)) { /* try to re-use r_osd if possible */ @@ -555,8 +567,8 @@ out: /* * caller should hold map_sem (for read) and request_mutex */ -static int send_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static int __send_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead; int err; @@ -582,6 +594,7 @@ static int send_request(struct ceph_osd_client *osdc, ceph_msg_get(req->r_request); /* send consumes a ref */ ceph_con_send(&req->r_osd->o_con, req->r_request); + req->r_sent = true; return 0; } @@ -617,7 +630,7 @@ static void handle_timeout(struct work_struct *work) int err; dout("osdc resending prev failed %lld\n", req->r_tid); - err = send_request(osdc, req); + err = __send_request(osdc, req); if (err) dout("osdc failed again on %lld\n", req->r_tid); else @@ -691,10 +704,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) req->r_reply = NULL; } - if (req->r_aborted) { - dout("handle_reply tid %llu aborted\n", tid); - goto done; - } if (!req->r_got_reply) { unsigned bytes; @@ -812,19 +821,12 @@ static void kick_requests(struct ceph_osd_client *osdc, kick: dout("kicking tid %llu osd%d\n", req->r_tid, req->r_osd->o_osd); - ceph_osdc_get_request(req); - mutex_unlock(&osdc->request_mutex); - req->r_request = ceph_msg_maybe_dup(req->r_request); - if (!req->r_aborted) { - req->r_flags |= CEPH_OSD_FLAG_RETRY; - err = send_request(osdc, req); - if (err) { - dout(" setting r_resend on %llu\n", req->r_tid); - req->r_resend = true; - } + req->r_flags |= CEPH_OSD_FLAG_RETRY; + err = __send_request(osdc, req); + if (err) { + dout(" setting r_resend on %llu\n", req->r_tid); + req->r_resend = true; } - ceph_osdc_put_request(req); - mutex_lock(&osdc->request_mutex); } mutex_unlock(&osdc->request_mutex); @@ -977,8 +979,7 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, } dout("prepare_pages tid %llu has %d pages, want %d\n", tid, req->r_num_pages, want); - if (likely(req->r_num_pages >= want && !req->r_prepared_pages && - !req->r_aborted)) { + if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) { m->pages = req->r_pages; m->nr_pages = req->r_num_pages; req->r_reply = m; /* only for duration of read over socket */ @@ -1007,7 +1008,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, down_read(&osdc->map_sem); mutex_lock(&osdc->request_mutex); - rc = send_request(osdc, req); + rc = __send_request(osdc, req); if (rc) { if (nofail) { dout("osdc_start_request failed send, marking %lld\n", @@ -1033,7 +1034,10 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, rc = wait_for_completion_interruptible(&req->r_completion); if (rc < 0) { - ceph_osdc_abort_request(osdc, req); + mutex_lock(&osdc->request_mutex); + __cancel_request(req); + mutex_unlock(&osdc->request_mutex); + dout("wait_request tid %llu timed out\n", req->r_tid); return rc; } @@ -1041,37 +1045,6 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, return req->r_result; } -/* - * To abort an in-progress request, take pages away from outgoing or - * incoming message. - */ -void ceph_osdc_abort_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - struct ceph_msg *msg; - - pr_err("abort_request tid %llu, revoking %p pages\n", req->r_tid, - req->r_request); - /* - * mark req aborted _before_ revoking pages, so that - * if a racing kick_request _does_ dup the page vec - * pointer, it will definitely then see the aborted - * flag and not send the request. - */ - req->r_aborted = 1; - msg = req->r_request; - mutex_lock(&msg->page_mutex); - msg->pages = NULL; - mutex_unlock(&msg->page_mutex); - if (req->r_reply) { - mutex_lock(&req->r_reply->page_mutex); - req->r_reply->pages = NULL; - mutex_unlock(&req->r_reply->page_mutex); - ceph_msg_put(req->r_reply); - req->r_reply = NULL; - } -} - /* * sync - wait for all in-flight requests to flush. avoid starvation. */ diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index b3e574c5575..36a5e4be6cf 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -40,7 +40,7 @@ struct ceph_osd_request { struct ceph_msg *r_request, *r_reply; int r_result; int r_flags; /* any additional flags for the osd */ - int r_aborted; /* set if we cancel this request */ + bool r_sent; /* true if r_request is sending/sent */ int r_prepared_pages, r_got_reply; struct ceph_osd_client *r_osdc; @@ -117,8 +117,6 @@ extern int ceph_osdc_start_request(struct ceph_osd_client *osdc, bool nofail); extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); -extern void ceph_osdc_abort_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); extern void ceph_osdc_sync(struct ceph_osd_client *osdc); extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 4bee0298440..644b2bd41d2 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1598,7 +1598,7 @@ Message *SimpleMessenger::Pipe::read_message() if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) return 0; - int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED); + int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; dout(10) << "aborted = " << aborted << dendl; if (aborted) { dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() @@ -1750,7 +1750,7 @@ int SimpleMessenger::Pipe::write_message(Message *m) header.front_len = m->get_payload().length(); header.middle_len = m->get_middle().length(); header.data_len = m->get_data().length(); - footer.flags = 0; + footer.flags = CEPH_MSG_FOOTER_COMPLETE; m->calc_header_crc(); bufferlist blist = m->get_payload();