From ae0daf01c40289d1763a550ff143fc15db048c73 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 13 Oct 2008 16:19:35 -0700 Subject: [PATCH] kclient: resend osd requests if osd resets the connection --- src/TODO | 3 --- src/kernel/messenger.c | 4 +++- src/kernel/messenger.h | 3 ++- src/kernel/osd_client.c | 31 +++++++++++++++++++++---------- src/kernel/osd_client.h | 3 +++ src/kernel/super.c | 21 ++++++++++++--------- 6 files changed, 41 insertions(+), 24 deletions(-) diff --git a/src/TODO b/src/TODO index 4b08d832f9acb..99d53edd7bb26 100644 --- a/src/TODO +++ b/src/TODO @@ -1,6 +1,3 @@ -- objecter retry -- kclient retry - v0.5 - debug restart, cosd reformat, etc. - finish btrfs ioctl interface diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 5021c26a60065..92fc2550ee204 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -1439,7 +1439,8 @@ static int process_connect(struct ceph_connection *con) reset_connection(con); prepare_write_connect_retry(con->msgr, con); prepare_read_connect(con); - con->msgr->peer_reset(con->msgr->parent, &con->peer_name); + con->msgr->peer_reset(con->msgr->parent, &con->peer_addr, + &con->peer_name); break; case CEPH_MSGR_TAG_RETRY_SESSION: dout(10, @@ -1604,6 +1605,7 @@ static int process_accept(struct ceph_connection *con) /* replace connection */ __replace_connection(msgr, existing, con); con->msgr->peer_reset(con->msgr->parent, + &con->peer_addr, &con->peer_name); } else { /* old attempt or peer didn't get the READY */ diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 2b6586a1fac42..73e1a694b733a 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -16,7 +16,8 @@ struct ceph_msg; extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */ typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m); -typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_name *pn); +typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr, + struct ceph_entity_name *pn); typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m, int want); diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 7b4fdcc9c8238..742242cb6e41f 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -337,13 +337,14 @@ bad: /* * caller should hold read sem */ -static int kick_requests(struct ceph_osd_client *osdc) +static void kick_requests(struct ceph_osd_client *osdc, + struct ceph_entity_addr *who) { u64 next_tid = 0; struct ceph_osd_request *req; int got; int osd; - int ret = 0; + int needmap = 0; more: spin_lock(&osdc->request_lock); @@ -358,10 +359,11 @@ more_locked: if (osd < 0) { dout(20, "tid %llu maps to no osd\n", req->r_tid); - ret++; /* request a newer map */ + needmap++; /* request a newer map */ memset(&req->r_last_osd, 0, sizeof(req->r_last_osd)); } else if (!ceph_entity_addr_equal(&req->r_last_osd, - &osdc->osdmap->osd_addr[osd])) { + &osdc->osdmap->osd_addr[osd]) || + (who && ceph_entity_addr_equal(&req->r_last_osd, who))) { dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd); get_request(req); spin_unlock(&osdc->request_lock); @@ -377,10 +379,12 @@ more_locked: done: spin_unlock(&osdc->request_lock); - if (ret) + if (needmap) { dout(10, "%d requests pending on down osds, need new map\n", - ret); - return ret; + needmap); + ceph_monc_request_osdmap(&osdc->client->monc, + osdc->osdmap->epoch); + } } void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) @@ -478,9 +482,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) done: downgrade_write(&osdc->map_sem); ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); - if (newmap && kick_requests(osdc)) - ceph_monc_request_osdmap(&osdc->client->monc, - osdc->osdmap->epoch); + if (newmap) + kick_requests(osdc, 0); up_read(&osdc->map_sem); return; @@ -491,6 +494,14 @@ bad: } +void ceph_osdc_handle_reset(struct ceph_osd_client *osdc, + struct ceph_entity_addr *addr) +{ + down_read(&osdc->map_sem); + kick_requests(osdc, addr); + up_read(&osdc->map_sem); +} + /* * find pages for message payload to be read into. diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 5664634cf015b..50759d86045c5 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -69,6 +69,9 @@ extern void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client); extern void ceph_osdc_stop(struct ceph_osd_client *osdc); +extern void ceph_osdc_handle_reset(struct ceph_osd_client *osdc, + struct ceph_entity_addr *addr); + extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg); extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, diff --git a/src/kernel/super.c b/src/kernel/super.c index 233d97659e3aa..9ba3dc6867c5f 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -40,7 +40,8 @@ int ceph_debug_super = -1; #include "mon_client.h" void ceph_dispatch(void *p, struct ceph_msg *msg); -void ceph_peer_reset(void *p, struct ceph_entity_name *peer_name); +void ceph_peer_reset(void *p, struct ceph_entity_addr *peer_addr, + struct ceph_entity_name *peer_name); /* @@ -339,17 +340,19 @@ const char *ceph_msg_type_name(int type) return "unknown"; } -void ceph_peer_reset(void *p, struct ceph_entity_name *peer_name) +void ceph_peer_reset(void *p, struct ceph_entity_addr *peer_addr, + struct ceph_entity_name *peer_name) { struct ceph_client *client = p; - dout(30, "ceph_peer_reset peer_name = %s%d\n", ENTITY_NAME(*peer_name)); - - /* we only care about mds disconnects */ - if (le32_to_cpu(peer_name->type) != CEPH_ENTITY_TYPE_MDS) - return; - - ceph_mdsc_handle_reset(&client->mdsc, le32_to_cpu(peer_name->num)); + dout(30, "ceph_peer_reset %s%d\n", ENTITY_NAME(*peer_name)); + switch (le32_to_cpu(peer_name->type)) { + case CEPH_ENTITY_TYPE_MDS: + return ceph_mdsc_handle_reset(&client->mdsc, + le32_to_cpu(peer_name->num)); + case CEPH_ENTITY_TYPE_OSD: + return ceph_osdc_handle_reset(&client->osdc, peer_addr); + } } -- 2.39.5