* Reset a connection. Discard all incoming and outgoing messages
* and clear *_seq state.
*/
+static void ceph_msg_remove(struct ceph_msg *msg)
+{
+ list_del_init(&msg->list_head);
+ ceph_msg_put(msg);
+}
+static void ceph_msg_remove_list(struct list_head *head)
+{
+ while (!list_empty(head)) {
+ struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
+ list_head);
+ ceph_msg_remove(msg);
+ }
+}
+
static void reset_connection(struct ceph_connection *con)
{
/* reset connection, out_queue, msg_ and connect_seq */
/* discard existing out_queue and msg_seq */
- spin_lock(&con->out_queue_lock);
- ceph_msg_put_list(&con->out_queue);
- ceph_msg_put_list(&con->out_sent);
+ mutex_lock(&con->out_mutex);
+ ceph_msg_remove_list(&con->out_queue);
+ ceph_msg_remove_list(&con->out_sent);
con->connect_seq = 0;
con->out_seq = 0;
+ con->out_qlen = 0;
con->out_msg = NULL;
con->in_seq = 0;
- spin_unlock(&con->out_queue_lock);
+ mutex_unlock(&con->out_mutex);
}
/*
memset(con, 0, sizeof(*con));
atomic_set(&con->nref, 1);
con->msgr = msgr;
- spin_lock_init(&con->out_queue_lock);
+ mutex_init(&con->out_mutex);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);
struct ceph_msg *m = con->out_msg;
dout("prepare_write_message_footer %p\n", con);
+ con->out_kvec_is_msg = true;
con->out_kvec[v].iov_base = &m->footer;
con->out_kvec[v].iov_len = sizeof(m->footer);
con->out_kvec_bytes += sizeof(m->footer);
int v = 0;
con->out_kvec_bytes = 0;
+ con->out_kvec_is_msg = true;
/* Sneak an ack in there first? If we can get it into the same
* TCP packet that's a good thing. */
list_move_tail(&m->list_head, &con->out_sent);
con->out_msg = m; /* we don't bother taking a reference here. */
+ BUG_ON(!con->out_qlen);
+ con->out_qlen--;
+
+ m->hdr.seq = cpu_to_le64(++con->out_seq);
+
dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
- m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type),
+ m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
le32_to_cpu(m->hdr.data_len),
m->nr_pages);
con->out_msg->hdr.crc =
cpu_to_le32(crc32c(0, (void *)&m->hdr,
sizeof(m->hdr) - sizeof(m->hdr.crc)));
- con->out_msg->footer.flags = 0;
+ con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
con->out_msg->footer.front_crc =
cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
if (m->middle)
}
}
con->out_kvec_left = 0;
+ con->out_kvec_is_msg = false;
ret = 1;
out:
dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
* to map the page. if our pages[] has been revoked, use the
* zero page.
*/
- mutex_lock(&msg->page_mutex);
if (msg->pages) {
page = msg->pages[con->out_msg_pos.page];
if (crc)
if (crc && msg->pages)
kunmap(page);
- mutex_unlock(&msg->page_mutex);
if (ret <= 0)
goto out;
/* prepare and queue up footer, too */
if (!crc)
- con->out_msg->footer.flags |=
- cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
+ con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
con->out_kvec_bytes = 0;
con->out_kvec_left = 0;
con->out_kvec_cur = con->out_kvec;
return ret;
}
+/*
+ * write some zeros
+ */
+static int write_partial_skip(struct ceph_connection *con)
+{
+ int ret;
+ while (con->out_skip > 0) {
+ struct kvec iov = {
+ .iov_base = page_address(con->msgr->zero_page),
+ .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
+ };
+
+ ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
+ if (ret <= 0)
+ goto out;
+ con->out_skip -= ret;
+ }
+ ret = 1;
+out:
+ return ret;
+}
/*
* Prepare to read connection handshake, or an ack.
u32 ack = le32_to_cpu(con->in_temp_ack);
u64 seq;
- spin_lock(&con->out_queue_lock);
+ mutex_lock(&con->out_mutex);
while (!list_empty(&con->out_sent)) {
m = list_first_entry(&con->out_sent, struct ceph_msg,
list_head);
le16_to_cpu(m->hdr.type), m);
ceph_msg_remove(m);
}
- spin_unlock(&con->out_queue_lock);
+ mutex_unlock(&con->out_mutex);
prepare_read_tag(con);
}
while (con->in_msg_pos.data_pos < data_len) {
left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- mutex_lock(&m->page_mutex);
- if (!m->pages) {
- dout("%p pages revoked during msg read\n", m);
- mutex_unlock(&m->page_mutex);
- con->in_base_pos = con->in_msg_pos.data_pos
- - data_len - sizeof(m->footer);
- ceph_msg_put(m);
- con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
- return 0;
- }
+ BUG_ON(m->pages == NULL);
p = kmap(m->pages[con->in_msg_pos.page]);
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
left);
crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret);
kunmap(m->pages[con->in_msg_pos.page]);
- mutex_unlock(&m->page_mutex);
if (ret <= 0)
return ret;
con->in_msg_pos.data_pos += ret;
return -EBADMSG;
}
if (datacrc &&
- (le32_to_cpu(m->footer.flags) & CEPH_MSG_FOOTER_NOCRC) == 0 &&
+ (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
pr_err("ceph read_partial_message %p data crc %u != exp. %u\n",
m,
if (con->peer_name.type == 0)
con->peer_name = msg->hdr.src.name;
- spin_lock(&con->out_queue_lock);
+ mutex_lock(&con->out_mutex);
con->in_seq++;
- spin_unlock(&con->out_queue_lock);
+ mutex_unlock(&con->out_mutex);
dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n",
msg, le64_to_cpu(msg->hdr.seq),
}
-
-
-
-
-
-
/*
* Write something to the socket. Called in a worker thread when the
* socket appears to be writeable and we have something ready to send.
dout("try_write start %p state %lu nref %d\n", con, con->state,
atomic_read(&con->nref));
+ mutex_lock(&con->out_mutex);
more:
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
more_kvec:
/* kvec data queued? */
+ if (con->out_skip) {
+ ret = write_partial_skip(con);
+ if (ret <= 0)
+ goto done;
+ if (ret < 0) {
+ dout("try_write write_partial_skip err %d\n", ret);
+ goto done;
+ }
+ }
if (con->out_kvec_left) {
ret = write_partial_kvec(con);
if (ret <= 0)
if (!test_bit(CONNECTING, &con->state)) {
/* is anything else pending? */
- spin_lock(&con->out_queue_lock);
if (!list_empty(&con->out_queue)) {
prepare_write_message(con);
- spin_unlock(&con->out_queue_lock);
goto more;
}
if (con->in_seq > con->in_seq_acked) {
prepare_write_ack(con);
- spin_unlock(&con->out_queue_lock);
goto more;
}
if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
prepare_write_keepalive(con);
- spin_unlock(&con->out_queue_lock);
goto more;
}
- spin_unlock(&con->out_queue_lock);
}
/* Nothing to do! */
done:
ret = 0;
out:
+ mutex_unlock(&con->out_mutex);
dout("try_write done on %p\n", con);
return ret;
}
/* If there are no messages in the queue, place the connection
* in a STANDBY state (i.e., don't try to reconnect just yet). */
- spin_lock(&con->out_queue_lock);
+ mutex_lock(&con->out_mutex);
if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
dout("fault setting STANDBY\n");
set_bit(STANDBY, &con->state);
- spin_unlock(&con->out_queue_lock);
+ mutex_unlock(&con->out_mutex);
goto out;
}
/* Requeue anything that hasn't been acked, and retry after a
* delay. */
list_splice_init(&con->out_sent, &con->out_queue);
- spin_unlock(&con->out_queue_lock);
+ mutex_unlock(&con->out_mutex);
if (con->delay == 0)
con->delay = BASE_DELAY_INTERVAL;
dout("destroyed messenger %p\n", msgr);
}
-/*
- * A single ceph_msg can't be queued for send twice, unless it's
- * already been delivered (i.e. we have the only remaining reference),
- * because of the list_head indicating which queue it is on.
- *
- * So, we dup the message if there is more than once reference. If it has
- * pages (a data payload), steal the pages away from the old message.
- */
-struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
-{
- struct ceph_msg *dup;
-
- if (atomic_read(&old->nref) == 1)
- return old; /* we have only ref, all is well */
-
- dup = ceph_msg_new(le16_to_cpu(old->hdr.type),
- le32_to_cpu(old->hdr.front_len),
- le32_to_cpu(old->hdr.data_len),
- le16_to_cpu(old->hdr.data_off),
- old->pages);
- if (!dup)
- return ERR_PTR(-ENOMEM);
- memcpy(dup->front.iov_base, old->front.iov_base,
- le32_to_cpu(old->hdr.front_len));
-
- /* revoke old message's pages */
- mutex_lock(&old->page_mutex);
- old->pages = NULL;
- old->footer.flags |= cpu_to_le32(CEPH_MSG_FOOTER_ABORTED);
- mutex_unlock(&old->page_mutex);
-
- ceph_msg_put(old);
- return dup;
-}
-
/*
* Queue up an outgoing message on the given connection.
*/
msg->hdr.dst_erank = con->peer_addr.erank;
/* queue */
- spin_lock(&con->out_queue_lock);
- msg->hdr.seq = cpu_to_le64(++con->out_seq);
- dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg,
- (unsigned)con->out_seq,
+ mutex_lock(&con->out_mutex);
+ list_add_tail(&msg->list_head, &con->out_queue);
+ con->out_qlen++;
+ dout("----- %p %llu to %s%d %d=%s len %d+%d+%d -----\n", msg,
+ con->out_seq + con->out_qlen,
ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
- dout("ceph_con_send %p %s%d %p seq %llu pgs %d\n",
- con, ENTITY_NAME(con->peer_name), msg,
- le64_to_cpu(msg->hdr.seq), msg->nr_pages);
- list_add_tail(&msg->list_head, &con->out_queue);
- spin_unlock(&con->out_queue_lock);
+ mutex_unlock(&con->out_mutex);
/* if there wasn't anything waiting to send before, queue
* new work */
queue_con(con);
}
+/*
+ * Revoke a message that was previously queued for send
+ */
+void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ mutex_lock(&con->out_mutex);
+ if (!list_empty(&msg->list_head)) {
+ dout("con_revoke %p msg %p\n", con, msg);
+ list_del_init(&msg->list_head);
+ ceph_msg_put(msg);
+ if (msg->hdr.seq == 0)
+ con->out_qlen--;
+ else
+ msg->hdr.seq = 0;
+ if (con->out_msg == msg)
+ con->out_msg = NULL;
+ if (con->out_kvec_is_msg) {
+ con->out_skip = con->out_kvec_bytes;
+ con->out_kvec_is_msg = false;
+ }
+ } else {
+ dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
+ }
+ mutex_unlock(&con->out_mutex);
+}
+
/*
* Queue a keepalive byte to ensure the tcp connection is alive.
*/
if (m == NULL)
goto out;
atomic_set(&m->nref, 1);
- mutex_init(&m->page_mutex);
INIT_LIST_HEAD(&m->list_head);
m->hdr.type = cpu_to_le16(type);
/*
* a single message. it contains a header (src, dest, message type, etc.),
* footer (crc values, mainly), a "front" message body, and possibly a
- * data payload (stored in some number of pages). The page_mutex protects
- * access to the page vector.
+ * data payload (stored in some number of pages).
*/
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
struct ceph_msg_footer footer; /* footer */
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
- struct mutex page_mutex;
struct page **pages; /* data payload. NOT OWNER. */
unsigned nr_pages; /* size of page array */
struct list_head list_head;
u32 peer_global_seq; /* peer's global seq for this connection */
/* out queue */
- spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */
+ struct mutex out_mutex;
struct list_head out_queue;
- struct list_head out_sent; /* sending/sent but unacked */
- u32 out_seq; /* last message queued for send */
+ struct list_head out_sent; /* sending or sent but unacked */
+ unsigned out_qlen;
+ u64 out_seq; /* last message queued for send */
+ u64 out_seq_sent; /* last message sent */
bool out_keepalive_pending;
u32 in_seq, in_seq_acked; /* last message received, acked */
out_sent) */
struct ceph_msg_pos out_msg_pos;
- struct kvec out_kvec[6], /* sending header/footer data */
+ struct kvec out_kvec[8], /* sending header/footer data */
*out_kvec_cur;
int out_kvec_left; /* kvec's left in out_kvec */
+ int out_skip; /* skip this many bytes */
int out_kvec_bytes; /* total bytes left */
+ bool out_kvec_is_msg; /* kvec refers to out_msg */
int out_more; /* there is more data after the kvecs */
__le32 out_temp_ack; /* for writing an ack */
struct ceph_entity_addr *addr);
extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
+extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con);
static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{
+ dout("ceph_msg_get %p %d -> %d\n", msg, atomic_read(&msg->nref),
+ atomic_read(&msg->nref)+1);
atomic_inc(&msg->nref);
return msg;
}
extern void ceph_msg_put(struct ceph_msg *msg);
-static inline void ceph_msg_remove(struct ceph_msg *msg)
-{
- list_del_init(&msg->list_head);
- ceph_msg_put(msg);
-}
-static inline void ceph_msg_put_list(struct list_head *head)
-{
- while (!list_empty(head)) {
- struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
- list_head);
- ceph_msg_remove(msg);
- }
-}
-
-extern struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *msg);
-
#endif
}
}
+/*
+ * Cancel a previously queued request message
+ */
+static void __cancel_request(struct ceph_osd_request *req)
+{
+ if (req->r_sent) {
+ ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+ req->r_sent = false;
+ }
+}
/*
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct
int err;
struct ceph_osd *newosd = NULL;
+ dout("map_osds %p tid %lld\n", req, req->r_tid);
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
&req->r_file_layout, osdc->osdmap);
if (err)
req->r_osd ? req->r_osd->o_osd : -1);
if (req->r_osd) {
+ __cancel_request(req);
list_del_init(&req->r_osd_item);
if (list_empty(&req->r_osd->o_requests)) {
/* try to re-use r_osd if possible */
/*
* caller should hold map_sem (for read) and request_mutex
*/
-static int send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static int __send_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead;
int err;
ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_con_send(&req->r_osd->o_con, req->r_request);
+ req->r_sent = true;
return 0;
}
int err;
dout("osdc resending prev failed %lld\n", req->r_tid);
- err = send_request(osdc, req);
+ err = __send_request(osdc, req);
if (err)
dout("osdc failed again on %lld\n", req->r_tid);
else
req->r_reply = NULL;
}
- if (req->r_aborted) {
- dout("handle_reply tid %llu aborted\n", tid);
- goto done;
- }
if (!req->r_got_reply) {
unsigned bytes;
kick:
dout("kicking tid %llu osd%d\n", req->r_tid, req->r_osd->o_osd);
- ceph_osdc_get_request(req);
- mutex_unlock(&osdc->request_mutex);
- req->r_request = ceph_msg_maybe_dup(req->r_request);
- if (!req->r_aborted) {
- req->r_flags |= CEPH_OSD_FLAG_RETRY;
- err = send_request(osdc, req);
- if (err) {
- dout(" setting r_resend on %llu\n", req->r_tid);
- req->r_resend = true;
- }
+ req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ err = __send_request(osdc, req);
+ if (err) {
+ dout(" setting r_resend on %llu\n", req->r_tid);
+ req->r_resend = true;
}
- ceph_osdc_put_request(req);
- mutex_lock(&osdc->request_mutex);
}
mutex_unlock(&osdc->request_mutex);
}
dout("prepare_pages tid %llu has %d pages, want %d\n",
tid, req->r_num_pages, want);
- if (likely(req->r_num_pages >= want && !req->r_prepared_pages &&
- !req->r_aborted)) {
+ if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
req->r_reply = m; /* only for duration of read over socket */
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
- rc = send_request(osdc, req);
+ rc = __send_request(osdc, req);
if (rc) {
if (nofail) {
dout("osdc_start_request failed send, marking %lld\n",
rc = wait_for_completion_interruptible(&req->r_completion);
if (rc < 0) {
- ceph_osdc_abort_request(osdc, req);
+ mutex_lock(&osdc->request_mutex);
+ __cancel_request(req);
+ mutex_unlock(&osdc->request_mutex);
+ dout("wait_request tid %llu timed out\n", req->r_tid);
return rc;
}
return req->r_result;
}
-/*
- * To abort an in-progress request, take pages away from outgoing or
- * incoming message.
- */
-void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
-{
- struct ceph_msg *msg;
-
- pr_err("abort_request tid %llu, revoking %p pages\n", req->r_tid,
- req->r_request);
- /*
- * mark req aborted _before_ revoking pages, so that
- * if a racing kick_request _does_ dup the page vec
- * pointer, it will definitely then see the aborted
- * flag and not send the request.
- */
- req->r_aborted = 1;
- msg = req->r_request;
- mutex_lock(&msg->page_mutex);
- msg->pages = NULL;
- mutex_unlock(&msg->page_mutex);
- if (req->r_reply) {
- mutex_lock(&req->r_reply->page_mutex);
- req->r_reply->pages = NULL;
- mutex_unlock(&req->r_reply->page_mutex);
- ceph_msg_put(req->r_reply);
- req->r_reply = NULL;
- }
-}
-
/*
* sync - wait for all in-flight requests to flush. avoid starvation.
*/