}
/*
- * caller should hold map_sem (for read)
+ * caller should hold map_sem (for read) and request_mutex
*/
static int send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
dout("handle_reply tid %llu aborted\n", tid);
goto done;
}
+ if (!req->r_got_reply) {
+ struct ceph_osd_reply_head *replyhead;
+ unsigned bytes;
+
+ replyhead = msg->front.iov_base;
+ req->r_result = le32_to_cpu(replyhead->result);
+ bytes = le32_to_cpu(msg->hdr.data_len);
+ dout("handle_reply result %d bytes %d\n", req->r_result,
+ bytes);
+ if (req->r_result == 0)
+ req->r_result = bytes;
+ req->r_got_reply = 1;
+ }
+ if (req->r_reply) {
+ ceph_msg_put(req->r_reply);
+ req->r_reply = NULL;
+ }
if (req->r_reassert_version.epoch == 0) {
- /* first ack */
- if (req->r_reply == NULL) {
- /* no data payload, or r_reply would have been set by
- prepare_pages. */
- ceph_msg_get(msg);
- req->r_reply = msg;
- } else {
- /* r_reply was set by prepare_pages */
- BUG_ON(req->r_reply != msg);
- }
-
/* in case we need to replay this op, */
req->r_reassert_version = rhead->reassert_version;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
}
dout("prepare_pages tid %llu has %d pages, want %d\n",
tid, req->r_num_pages, want);
- if (likely(req->r_num_pages >= want && req->r_reply == NULL &&
- !req->r_aborted)) {
+ if (likely(req->r_num_pages >= want && !req->r_prepared_pages &&
+ !req->r_aborted)) {
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
+ req->r_reply = m; /* only for duration of read over socket */
ceph_msg_get(m);
- req->r_reply = m;
+ req->r_prepared_pages = 1;
ret = 0; /* success */
}
out:
register_request(osdc, req);
down_read(&osdc->map_sem);
+ mutex_lock(&osdc->request_mutex);
rc = send_request(osdc, req);
- up_read(&osdc->map_sem);
if (rc) {
if (nofail) {
dout("osdc_start_request failed send, marking %lld\n",
req->r_resend = true;
rc = 0;
} else {
- mutex_lock(&osdc->request_mutex);
__unregister_request(osdc, req);
- mutex_unlock(&osdc->request_mutex);
}
}
+ mutex_unlock(&osdc->request_mutex);
+ up_read(&osdc->map_sem);
return rc;
}
int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
- struct ceph_osd_reply_head *replyhead;
- __s32 rc;
- int bytes;
+ int rc;
rc = wait_for_completion_interruptible(&req->r_completion);
if (rc < 0) {
return rc;
}
- /* parse reply */
- replyhead = req->r_reply->front.iov_base;
- rc = le32_to_cpu(replyhead->result);
- bytes = le32_to_cpu(req->r_reply->hdr.data_len);
- dout("wait_request tid %llu result %d, %d bytes\n",
- req->r_tid, rc, bytes);
- if (rc < 0)
- return rc;
- return bytes;
+ dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
+ return req->r_result;
}
/*