From 5a4df689fdf066c888455c7f737cdd72170e917b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 13 Oct 2008 17:42:44 -0700 Subject: [PATCH] kclient: ping osds whose requests are timing out This ensures the client detects when the osd tcp session has closed and a reply may have been lost. If that does happen, the handle_reset handler will be called and the request(s) will be resent. --- src/kernel/messenger.c | 43 +++++++++++++++++++++++++++++++---------- src/kernel/messenger.h | 3 +++ src/kernel/osd_client.c | 35 +++++++++++++++++++++++++++++++-- src/kernel/osd_client.h | 4 +++- 4 files changed, 72 insertions(+), 13 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 92fc2550ee204..d81caa4441199 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -2071,16 +2071,26 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, /* queue */ spin_lock(&con->out_queue_lock); - msg->hdr.seq = cpu_to_le64(++con->out_seq); - dout(1, "----- %p %u to %s%d %d=%s len %d+%d -----\n", msg, - (unsigned)con->out_seq, - ENTITY_NAME(msg->hdr.dst.name), le32_to_cpu(msg->hdr.type), - ceph_msg_type_name(le32_to_cpu(msg->hdr.type)), - le32_to_cpu(msg->hdr.front_len), - le32_to_cpu(msg->hdr.data_len)); - dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p pgs %d\n", msg, - le64_to_cpu(msg->hdr.seq), ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages); - list_add_tail(&msg->list_head, &con->out_queue); + if (unlikely(msg->hdr.type == CEPH_MSG_PING && + !list_empty(&con->out_queue) && + list_entry(con->out_queue.prev, struct ceph_msg, + list_head)->hdr.type == CEPH_MSG_PING)) { + /* don't queue multiple pings in a row */ + dout(2, "ceph_msg_send dropping dup ping\n"); + ceph_msg_put(msg); + } else { + msg->hdr.seq = cpu_to_le64(++con->out_seq); + dout(1, "----- %p %u to %s%d %d=%s len %d+%d -----\n", msg, + (unsigned)con->out_seq, + ENTITY_NAME(msg->hdr.dst.name), le32_to_cpu(msg->hdr.type), + ceph_msg_type_name(le32_to_cpu(msg->hdr.type)), + le32_to_cpu(msg->hdr.front_len), + le32_to_cpu(msg->hdr.data_len)); + dout(2, "ceph_msg_send %p seq %llu for %s%d on %p pgs %d\n", + msg, le64_to_cpu(msg->hdr.seq), + ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages); + list_add_tail(&msg->list_head, &con->out_queue); + } spin_unlock(&con->out_queue_lock); if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) @@ -2164,3 +2174,16 @@ void ceph_msg_put(struct ceph_msg *m) } } +void ceph_ping(struct ceph_messenger *msgr, struct ceph_entity_name name, + struct ceph_entity_addr *addr) +{ + struct ceph_msg *m; + + m = ceph_msg_new(CEPH_MSG_PING, 0, 0, 0, 0); + if (!m) + return; + memset(m->front.iov_base, 0, m->front.iov_len); + m->hdr.dst.name = name; + m->hdr.dst.addr = *addr; + ceph_msg_send(msgr, m, 0); +} diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 73e1a694b733a..c5fdb739184aa 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -183,4 +183,7 @@ extern struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *msg); extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned long timeout); +extern void ceph_ping(struct ceph_messenger *msgr, struct ceph_entity_name name, + struct ceph_entity_addr *addr); + #endif diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 45bf52aed4c29..938d9abbf1e72 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -271,7 +271,9 @@ static int send_request(struct ceph_osd_client *osdc, cpu_to_le32(CEPH_ENTITY_TYPE_OSD); req->r_request->hdr.dst.name.num = cpu_to_le32(osd); req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd]; - req->r_last_osd = req->r_request->hdr.dst.addr; + req->r_last_osd = osd; + req->r_last_osd_addr = req->r_request->hdr.dst.addr; + req->r_last_stamp = jiffies; ceph_msg_get(req->r_request); /* send consumes a ref */ rc = ceph_msg_send(osdc->client->msgr, req->r_request, 0); @@ -359,7 +361,9 @@ static void kick_requests(struct ceph_osd_client *osdc, dout(20, "tid %llu maps to no osd\n", req->r_tid); needmap++; /* request a newer map */ - memset(&req->r_last_osd, 0, sizeof(req->r_last_osd)); + req->r_last_osd = -1; + memset(&req->r_last_osd_addr, 0, + sizeof(req->r_last_osd_addr)); continue; } if (!ceph_entity_addr_equal(&req->r_last_osd, @@ -608,11 +612,38 @@ int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) void handle_timeout(struct work_struct *work) { + u64 next_tid = 0; struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); + struct ceph_osd_request *req; + int got; + int timeout = osdc->client->mount_args.osd_timeout * HZ; + dout(10, "timeout\n"); down_read(&osdc->map_sem); ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch); + + /* + * ping any osds with pending requests to ensure the communications + * channel hasn't reset + */ + mutex_lock(&osdc->request_mutex); + while (1) { + got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req, + next_tid, 1); + if (got == 0) + break; + next_tid = req->r_tid + 1; + if (time_after(jiffies, req->r_last_stamp + timeout)) { + struct ceph_entity_name n = { + .type = CEPH_ENTITY_TYPE_OSD, + .num = req->r_last_osd + }; + ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr); + } + } + mutex_unlock(&osdc->request_mutex); + up_read(&osdc->map_sem); } diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index ab19e60aef991..25888cb653c6b 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -34,7 +34,9 @@ struct ceph_osd_request { struct inode *r_inode; struct writeback_control *r_wbc; struct ceph_msg *r_request; - struct ceph_entity_addr r_last_osd; /* last osd we sent request to */ + int r_last_osd; /* last osd we sent request to */ + struct ceph_entity_addr r_last_osd_addr; + unsigned long r_last_stamp; union ceph_pg r_pgid; struct ceph_msg *r_reply; int r_result; -- 2.39.5