From e2aa8a9eec7a3646cd27199cb8292e05da30bfe3 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 21 Apr 2008 07:45:31 -0700 Subject: [PATCH] kclient: osd timeouts; also cleaned up MOSDGetMap and map request handling --- src/include/ceph_fs.h | 4 +- src/kernel/mon_client.c | 34 ++- src/kernel/mon_client.h | 4 +- src/kernel/osd_client.c | 468 ++++++++++++++++++++++++-------------- src/kernel/osd_client.h | 19 +- src/kernel/osdmap.c | 14 +- src/kernel/osdmap.h | 11 +- src/kernel/super.c | 6 + src/kernel/super.h | 1 + src/messages/MOSDGetMap.h | 12 +- src/mon/OSDMonitor.cc | 22 +- src/mon/OSDMonitor.h | 2 +- src/osdc/Objecter.cc | 2 +- 13 files changed, 371 insertions(+), 228 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index d726a3e56bb30..fe6220c16747c 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -291,12 +291,12 @@ struct ceph_statfs { struct ceph_osd_getmap { struct ceph_fsid fsid; - __le64 start, want; + __le32 start; } __attribute__ ((packed)); struct ceph_mds_getmap { struct ceph_fsid fsid; - __le64 have; + __le32 have; } __attribute__ ((packed)); diff --git a/src/kernel/mon_client.c b/src/kernel/mon_client.c index fdbd3c05df4f1..644a34209514f 100644 --- a/src/kernel/mon_client.c +++ b/src/kernel/mon_client.c @@ -158,8 +158,7 @@ static void do_request_osdmap(struct work_struct *work) return; h = msg->front.iov_base; h->fsid = monc->monmap->fsid; - h->start = cpu_to_le32(monc->have_osdmap); - h->want = cpu_to_le32(monc->want_osdmap); + h->start = cpu_to_le32(monc->have_osdmap + 1); msg->hdr.dst = monc->monmap->mon_inst[mon]; ceph_msg_send(monc->client->msgr, msg, 0); @@ -168,35 +167,29 @@ static void do_request_osdmap(struct work_struct *work) delayed_work(&monc->osd_delayed_work, &monc->osd_delay); } -void ceph_monc_request_osdmap(struct ceph_mon_client *monc, - __u32 have, __u32 want) +void ceph_monc_request_osdmap(struct ceph_mon_client *monc, __u32 have) { - dout(5, "ceph_monc_request_osdmap have %u want %u\n", have, want); + dout(5, "ceph_monc_request_osdmap have %u\n", have); monc->osd_delay = BASE_DELAY_INTERVAL; monc->have_osdmap = have; - monc->want_osdmap = want; do_request_osdmap(&monc->osd_delayed_work.work); } -int ceph_monc_got_osdmap(struct ceph_mon_client *monc, __u32 have) +int ceph_monc_got_osdmap(struct ceph_mon_client *monc, __u32 got) { - dout(5, "ceph_monc_got_osdmap calling cancel_delayed_work_sync\n"); + if (got <= monc->have_osdmap) { + dout(5, "ceph_monc_got_osdmap got %u <= had %u, will retry\n", + got, monc->have_osdmap); + return -EAGAIN; + } /* we got map so take map request out of queue */ + dout(5, "ceph_monc_got_osdmap got %u > had %u\n", + got, monc->have_osdmap); cancel_delayed_work_sync(&monc->osd_delayed_work); monc->osd_delay = BASE_DELAY_INTERVAL; - - if (have > monc->want_osdmap) { - monc->want_osdmap = 0; - monc->have_osdmap = 0; - dout(5, "ceph_monc_got_osdmap have %u > wanted %u\n", - have, monc->want_osdmap); - return 0; - } else { - dout(5, "ceph_monc_got_osdmap have %u <= wanted %u *****\n", - have, monc->want_osdmap); - return -EAGAIN; - } + monc->have_osdmap = 0; + return 0; } @@ -331,6 +324,5 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) monc->last_tid = 0; monc->have_mdsmap = 0; monc->have_osdmap = 0; - monc->want_osdmap = 0; return 0; } diff --git a/src/kernel/mon_client.h b/src/kernel/mon_client.h index 849653620fb41..f9cce5e706d3d 100644 --- a/src/kernel/mon_client.h +++ b/src/kernel/mon_client.h @@ -39,7 +39,6 @@ struct ceph_mon_client { unsigned long umount_delay; u32 have_mdsmap; /* protected by caller's lock */ - u32 want_osdmap; /* protected by caller's lock */ u32 have_osdmap; /* protected by caller's lock */ }; @@ -52,8 +51,7 @@ extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl); extern void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, __u32 have); extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, __u32 have); -extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, - __u32 have, __u32 want); +extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, __u32 have); extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, __u32 have); extern void ceph_monc_request_umount(struct ceph_mon_client *monc); diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index fc9c18c996f88..e02f0a4a3e7fe 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -22,122 +22,6 @@ struct ceph_readdesc { loff_t len; }; -void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) -{ - dout(5, "init\n"); - spin_lock_init(&osdc->lock); - osdc->client = client; - osdc->osdmap = NULL; - osdc->last_requested_map = 0; - osdc->last_tid = 0; - INIT_RADIX_TREE(&osdc->request_tree, GFP_KERNEL); - init_completion(&osdc->map_waiters); -} - -void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) -{ - void *p, *end, *next; - __u32 nr_maps, maplen; - __u32 epoch; - struct ceph_osdmap *newmap = 0; - int err; - struct ceph_fsid fsid; - - dout(2, "handle_map, have %u\n", osdc->osdmap ? osdc->osdmap->epoch:0); - p = msg->front.iov_base; - end = p + msg->front.iov_len; - - /* verify fsid */ - ceph_decode_need(&p, end, sizeof(fsid), bad); - ceph_decode_64(&p, fsid.major); - ceph_decode_64(&p, fsid.minor); - if (!ceph_fsid_equal(&fsid, &osdc->client->monc.monmap->fsid)) { - derr(0, "got map with wrong fsid, ignoring\n"); - return; - } - - /* incremental maps */ - ceph_decode_32_safe(&p, end, nr_maps, bad); - dout(10, " %d inc maps\n", nr_maps); - while (nr_maps > 0) { - ceph_decode_need(&p, end, 2*sizeof(__u32), bad); - ceph_decode_32(&p, epoch); - ceph_decode_32(&p, maplen); - ceph_decode_need(&p, end, maplen, bad); - next = p + maplen; - if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { - dout(10, "applying incremental map %u len %d\n", - epoch, maplen); - newmap = apply_incremental(&p, p+maplen, osdc->osdmap); - if (IS_ERR(newmap)) { - err = PTR_ERR(newmap); - goto bad; - } - if (newmap != osdc->osdmap) { - osdmap_destroy(osdc->osdmap); - osdc->osdmap = newmap; - } - } else { - dout(10, "ignoring incremental map %u len %d\n", - epoch, maplen); - } - p = next; - nr_maps--; - } - if (newmap) - goto out; - - /* full maps */ - ceph_decode_32_safe(&p, end, nr_maps, bad); - dout(30, " %d full maps\n", nr_maps); - while (nr_maps > 1) { - ceph_decode_need(&p, end, 2*sizeof(__u32), bad); - ceph_decode_32(&p, epoch); - ceph_decode_32(&p, maplen); - ceph_decode_need(&p, end, maplen, bad); - dout(5, "skipping non-latest full map %u len %d\n", - epoch, maplen); - p += maplen; - nr_maps--; - } - if (nr_maps) { - ceph_decode_need(&p, end, 2*sizeof(__u32), bad); - ceph_decode_32(&p, epoch); - ceph_decode_32(&p, maplen); - ceph_decode_need(&p, end, maplen, bad); - if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { - dout(10, "skipping full map %u len %d, " - "older than our %u\n", epoch, maplen, - osdc->osdmap->epoch); - p += maplen; - } else { - dout(10, "taking full map %u len %d\n", epoch, maplen); - newmap = osdmap_decode(&p, p+maplen); - if (IS_ERR(newmap)) { - err = PTR_ERR(newmap); - goto bad; - } - if (osdc->osdmap) - osdmap_destroy(osdc->osdmap); - osdc->osdmap = newmap; - } - } - dout(2, "handle_map done\n"); - - ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); - - /* kick any pending requests that need kicking */ - /* WRITE ME */ - -out: - return; - -bad: - derr(1, "handle_map corrupt msg\n"); - goto out; -} - - /* * requests @@ -188,14 +72,25 @@ static struct ceph_osd_request *alloc_request(int nr_pages, return ERR_PTR(-ENOMEM); req->r_request = msg; req->r_nr_pages = nr_pages; + memset(&req->r_last_osd, 0, sizeof(req->r_last_osd)); return req; } +static void reschedule_timeout(struct ceph_osd_client *osdc) +{ + int timeout = osdc->client->mount_args.osd_timeout; + dout(10, "reschedule timeout (%d seconds)\n", timeout); + schedule_delayed_work(&osdc->timeout_work, + round_jiffies_relative(timeout*HZ)); +} + static int register_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { struct ceph_osd_request_head *head = req->r_request->front.iov_base; + int rc; + spin_lock(&osdc->request_lock); req->r_tid = head->tid = ++osdc->last_tid; req->r_flags = 0; req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid); @@ -205,24 +100,53 @@ static int register_request(struct ceph_osd_client *osdc, init_completion(&req->r_completion); dout(30, "register_request %p tid %lld\n", req, req->r_tid); - return radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req); + rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req); + + if (osdc->nr_requests == 0) + reschedule_timeout(osdc); + osdc->nr_requests++; + + spin_unlock(&osdc->request_lock); + return rc; } -static void send_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static void unregister_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + dout(30, "unregister_request %p tid %lld\n", req, req->r_tid); + spin_lock(&osdc->request_lock); + radix_tree_delete(&osdc->request_tree, req->r_tid); + + osdc->nr_requests--; + cancel_delayed_work_sync(&osdc->timeout_work); + if (osdc->nr_requests) + reschedule_timeout(osdc); + + spin_unlock(&osdc->request_lock); + put_request(req); +} + +/* + * pick an osd. the pg primary, namely. + */ +static int pick_osd(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { int ruleno; - int osds[10]; - int nr_osds; int i; int pps; /* placement ps */ - - dout(30, "send_request %p\n", req); - + int osds[10]; + int nr_osds; + ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool, req->r_pgid.pg.type, req->r_pgid.pg.size); - BUG_ON(ruleno < 0); /* fixme, need some proper error handling here */ - dout(30, "using crush rule %d\n", ruleno); + if (ruleno < 0) { + derr(0, "pick_osd no crush rule for pool %d type %d size %d\n", + req->r_pgid.pg.pool, req->r_pgid.pg.type, + req->r_pgid.pg.size); + return -1; + } + if (req->r_pgid.pg.preferred >= 0) pps = ceph_stable_mod(req->r_pgid.pg.ps, osdc->osdmap->lpgp_num, @@ -233,32 +157,71 @@ static void send_request(struct ceph_osd_client *osdc, osdc->osdmap->pgp_num_mask); nr_osds = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds, req->r_pgid.pg.size, req->r_pgid.pg.preferred); - for (i = 0; i < nr_osds; i++) { + + for (i = 0; i < nr_osds; i++) if (ceph_osd_is_up(osdc->osdmap, osds[i])) - break; - } - if (i < nr_osds) { - dout(10, "send_request %p tid %llu to osd%d flags %d\n", - req, req->r_tid, osds[i], req->r_flags); - req->r_request->hdr.dst.name.type = - cpu_to_le32(CEPH_ENTITY_TYPE_OSD); - req->r_request->hdr.dst.name.num = cpu_to_le32(osds[i]); - req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osds[i]]; - ceph_msg_get(req->r_request); /* send consumes a ref */ - ceph_msg_send(osdc->client->msgr, req->r_request, 0); - } else { - dout(10, "send_request no osds in this pg are up\n"); - ceph_monc_request_osdmap(&osdc->client->monc, - osdc->osdmap->epoch, 0); - } + return osds[i]; + return -1; } -static void unregister_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +/* + * a single ceph_msg can't be queued for send twice, unless it's + * already been delivered (i.e. we have the only remaining reference). + */ +static struct ceph_msg *redup_request(struct ceph_msg *old) { - dout(30, "unregister_request %p tid %lld\n", req, req->r_tid); - radix_tree_delete(&osdc->request_tree, req->r_tid); - put_request(req); + struct ceph_msg *dup; + + if (atomic_read(&old->nref) == 1) + return old; /* we have only ref, all good */ + + dup = ceph_msg_new(le32_to_cpu(old->hdr.type), + le32_to_cpu(old->hdr.front_len), + le32_to_cpu(old->hdr.data_len), + le32_to_cpu(old->hdr.data_off), + old->pages); + BUG_ON(!dup); + memcpy(dup->front.iov_base, old->front.iov_base, + le32_to_cpu(old->hdr.front_len)); + if (old->pages) + derr(0, "WARNING: unsafely referenced old pages for %p\n", + old); + ceph_msg_put(old); + return dup; +} + +/* + * caller should hold map_sem (for read) + */ +static void send_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req, + int osd) +{ + struct ceph_osd_request_head *reqhead; + + if (osd < 0) + osd = pick_osd(osdc, req); + if (osd < 0) { + dout(10, "send_request %p no up osds in pg\n", req); + ceph_monc_request_osdmap(&osdc->client->monc, + osdc->osdmap->epoch); + return; + } + + dout(10, "send_request %p tid %llu to osd%d flags %d\n", + req, req->r_tid, osd, req->r_flags); + + reqhead = req->r_request->front.iov_base; + reqhead->osdmap_epoch = osdc->osdmap->epoch; + + req->r_request->hdr.dst.name.type = + cpu_to_le32(CEPH_ENTITY_TYPE_OSD); + req->r_request->hdr.dst.name.num = cpu_to_le32(osd); + req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd]; + req->r_last_osd = req->r_request->hdr.dst.addr; + + ceph_msg_get(req->r_request); /* send consumes a ref */ + ceph_msg_send(osdc->client->msgr, req->r_request, 0); } /* @@ -274,11 +237,12 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) /* lookup */ tid = le64_to_cpu(rhead->tid); - spin_lock(&osdc->lock); + + spin_lock(&osdc->request_lock); req = radix_tree_lookup(&osdc->request_tree, tid); if (req == NULL) { dout(10, "handle_reply tid %llu dne\n", tid); - spin_unlock(&osdc->lock); + spin_unlock(&osdc->request_lock); return; } get_request(req); @@ -293,12 +257,160 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) dout(10, "handle_reply tid %llu flags %d |= %d\n", tid, req->r_flags, rhead->flags); req->r_flags |= rhead->flags; - spin_unlock(&osdc->lock); + spin_unlock(&osdc->request_lock); complete(&req->r_completion); put_request(req); } +/* + * caller should hold read sem + */ +static int kick_requests(struct ceph_osd_client *osdc) +{ + u64 next_tid = 0; + struct ceph_osd_request *req; + int got; + int osd; + int ret = 0; + +more: + spin_lock(&osdc->request_lock); + got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req, + next_tid, 1); + if (got) { + next_tid = req->r_tid + 1; + osd = pick_osd(osdc, req); + if (osd < 0) + ret = 1; /* request a newer map */ + else if (!ceph_entity_addr_equal(&req->r_last_osd, + &osdc->osdmap->osd_addr[osd])) { + dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd); + get_request(req); + spin_unlock(&osdc->request_lock); + req->r_request = redup_request(req->r_request); + send_request(osdc, req, osd); + put_request(req); + goto more; + } + spin_unlock(&osdc->request_lock); + goto more; + } + spin_unlock(&osdc->request_lock); + if (ret) + dout(10, "%d requests still pending on down osds\n", ret); + return ret; +} + +void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) +{ + void *p, *end, *next; + __u32 nr_maps, maplen; + __u32 epoch; + struct ceph_osdmap *newmap = 0; + int err; + struct ceph_fsid fsid; + + dout(2, "handle_map, have %u\n", osdc->osdmap ? osdc->osdmap->epoch:0); + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + /* verify fsid */ + ceph_decode_need(&p, end, sizeof(fsid), bad); + ceph_decode_64(&p, fsid.major); + ceph_decode_64(&p, fsid.minor); + if (!ceph_fsid_equal(&fsid, &osdc->client->monc.monmap->fsid)) { + derr(0, "got map with wrong fsid, ignoring\n"); + return; + } + + down_write(&osdc->map_sem); + + /* incremental maps */ + ceph_decode_32_safe(&p, end, nr_maps, bad); + dout(10, " %d inc maps\n", nr_maps); + while (nr_maps > 0) { + ceph_decode_need(&p, end, 2*sizeof(__u32), bad); + ceph_decode_32(&p, epoch); + ceph_decode_32(&p, maplen); + ceph_decode_need(&p, end, maplen, bad); + next = p + maplen; + if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { + dout(10, "applying incremental map %u len %d\n", + epoch, maplen); + newmap = apply_incremental(&p, next, osdc->osdmap, + osdc->client->msgr); + if (IS_ERR(newmap)) { + err = PTR_ERR(newmap); + goto bad; + } + if (newmap != osdc->osdmap) { + osdmap_destroy(osdc->osdmap); + osdc->osdmap = newmap; + } + } else { + dout(10, "ignoring incremental map %u len %d\n", + epoch, maplen); + } + p = next; + nr_maps--; + } + if (newmap) + goto done; + + /* full maps */ + ceph_decode_32_safe(&p, end, nr_maps, bad); + dout(30, " %d full maps\n", nr_maps); + while (nr_maps > 1) { + ceph_decode_need(&p, end, 2*sizeof(__u32), bad); + ceph_decode_32(&p, epoch); + ceph_decode_32(&p, maplen); + ceph_decode_need(&p, end, maplen, bad); + dout(5, "skipping non-latest full map %u len %d\n", + epoch, maplen); + p += maplen; + nr_maps--; + } + if (nr_maps) { + ceph_decode_need(&p, end, 2*sizeof(__u32), bad); + ceph_decode_32(&p, epoch); + ceph_decode_32(&p, maplen); + ceph_decode_need(&p, end, maplen, bad); + if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { + dout(10, "skipping full map %u len %d, " + "older than our %u\n", epoch, maplen, + osdc->osdmap->epoch); + p += maplen; + } else { + dout(10, "taking full map %u len %d\n", epoch, maplen); + newmap = osdmap_decode(&p, p+maplen); + if (IS_ERR(newmap)) { + err = PTR_ERR(newmap); + goto bad; + } + if (osdc->osdmap) + osdmap_destroy(osdc->osdmap); + osdc->osdmap = newmap; + } + } + +done: + downgrade_write(&osdc->map_sem); + ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); + if (kick_requests(osdc)) + ceph_monc_request_osdmap(&osdc->client->monc, + osdc->osdmap->epoch); + up_read(&osdc->map_sem); + return; + +bad: + derr(1, "handle_map corrupt msg\n"); + up_write(&osdc->map_sem); + return; +} + + + /* * find pages for message payload to be read into. * 0 = success, -1 failure. @@ -318,7 +430,7 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want) return -1; /* hmm! */ tid = le64_to_cpu(rhead->tid); - spin_lock(&osdc->lock); + spin_lock(&osdc->request_lock); req = radix_tree_lookup(&osdc->request_tree, tid); if (!req) { dout(10, "prepare_pages unknown tid %llu\n", tid); @@ -332,34 +444,29 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want) ret = 0; /* success */ } out: - spin_unlock(&osdc->lock); + spin_unlock(&osdc->request_lock); return ret; } - +/* + * synchronously do an osd request. + */ int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_osd_reply_head *replyhead; __s32 rc; int bytes; /* register+send request */ - spin_lock(&osdc->lock); - rc = register_request(osdc, req); - if (rc < 0) { - spin_unlock(&osdc->lock); - return rc; - } - reqhead->osdmap_epoch = osdc->osdmap->epoch; - send_request(osdc, req); - spin_unlock(&osdc->lock); + down_read(&osdc->map_sem); + radix_tree_preload(GFP_KERNEL); + register_request(osdc, req); + send_request(osdc, req, -1); + up_read(&osdc->map_sem); rc = wait_for_completion_interruptible(&req->r_completion); - spin_lock(&osdc->lock); unregister_request(osdc, req); - spin_unlock(&osdc->lock); if (rc >= 0) { /* parse reply */ @@ -374,6 +481,35 @@ int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) return bytes; } +void handle_timeout(struct work_struct *work) +{ + struct ceph_osd_client *osdc = + container_of(work, struct ceph_osd_client, timeout_work.work); + dout(10, "timeout\n"); + down_read(&osdc->map_sem); + ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch); + up_read(&osdc->map_sem); +} + +void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) +{ + dout(5, "init\n"); + osdc->client = client; + + osdc->osdmap = NULL; + init_rwsem(&osdc->map_sem); + init_completion(&osdc->map_waiters); + osdc->last_requested_map = 0; + + spin_lock_init(&osdc->request_lock); + osdc->last_tid = 0; + osdc->nr_requests = 0; + INIT_RADIX_TREE(&osdc->request_tree, GFP_KERNEL); + INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); +} + + + /* * calculate the mapping of an extent onto an object, and fill out the * request accordingly. shorten extent as necessary if it hits an diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 1769cdf600f7e..f88b8f0c43aed 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -9,15 +9,6 @@ #include "osdmap.h" -/* flags for page cache handling in CEPH. - * TODO: Where should they used? in which data structure? - */ -#define CEPH_PGCACHE_RD 0x00000001 -#define CEPH_PGCACHE_WR 0x00000002 -#define CEPH_PGCACHE_RDCACHE 0x00000004 -#define CEPH_PGCACHE_WRBUFFER 0x00000008 - - struct ceph_msg; /* @@ -33,6 +24,7 @@ struct ceph_osd_request { __u64 r_tid; int r_flags; struct ceph_msg *r_request; + struct ceph_entity_addr r_last_osd; /* last osd we sent request to */ union ceph_pg r_pgid; struct ceph_msg *r_reply; int r_result; @@ -43,13 +35,18 @@ struct ceph_osd_request { }; struct ceph_osd_client { - spinlock_t lock; struct ceph_client *client; + struct ceph_osdmap *osdmap; /* current map */ + struct rw_semaphore map_sem; + struct completion map_waiters; __u64 last_requested_map; + + spinlock_t request_lock; __u64 last_tid; /* tid of last request */ struct radix_tree_root request_tree; /* pending requests, by tid */ - struct completion map_waiters; + int nr_requests; + struct delayed_work timeout_work; }; extern void ceph_osdc_init(struct ceph_osd_client *osdc, diff --git a/src/kernel/osdmap.c b/src/kernel/osdmap.c index 6ad0c7fe4ab08..1f1201922ea27 100644 --- a/src/kernel/osdmap.c +++ b/src/kernel/osdmap.c @@ -388,7 +388,8 @@ bad: } struct ceph_osdmap *apply_incremental(void **p, void *end, - struct ceph_osdmap *map) + struct ceph_osdmap *map, + struct ceph_messenger *msgr) { struct ceph_osdmap *newmap = map; struct crush_map *newcrush = 0; @@ -468,9 +469,13 @@ struct ceph_osdmap *apply_incremental(void **p, void *end, while (len--) { __u32 osd; ceph_decode_32_safe(p, end, osd, bad); + (*p)++; /* clean flag */ dout(1, "osd%d down\n", osd); - if (osd < map->max_osd) + if (osd < map->max_osd) { map->osd_state[osd] &= ~CEPH_OSD_UP; + ceph_messenger_mark_down(msgr, + &map->osd_addr[osd]); + } } /* new_offload */ @@ -491,11 +496,14 @@ struct ceph_osdmap *apply_incremental(void **p, void *end, ceph_decode_32_safe(p, end, len, bad); *p += len * sizeof(__u64); - if (*p != end) + if (*p != end) { + dout(10, "trailing gunk\n"); goto bad; + } return map; bad: + derr(10, "corrupt incremantal osdmap at %p / %p\n", *p, end); return ERR_PTR(err); } diff --git a/src/kernel/osdmap.h b/src/kernel/osdmap.h index f7e8d62dc7615..2584c49e99aa1 100644 --- a/src/kernel/osdmap.h +++ b/src/kernel/osdmap.h @@ -35,8 +35,17 @@ static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); } +static inline struct ceph_entity_addr * +ceph_osd_addr(struct ceph_osdmap *map, int osd) +{ + if (osd >= map->max_osd) + return 0; + return &map->osd_addr[osd]; +} + extern struct ceph_osdmap *apply_incremental(void **p, void *end, - struct ceph_osdmap *map); + struct ceph_osdmap *map, + struct ceph_messenger *msgr); extern void osdmap_destroy(struct ceph_osdmap *map); extern struct ceph_osdmap *osdmap_decode(void **p, void *end); diff --git a/src/kernel/super.c b/src/kernel/super.c index 99bb76f969b2b..0f1406b15818b 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -222,6 +222,7 @@ enum { Opt_monport, Opt_port, Opt_wsize, + Opt_osdtimeout, /* int args above */ Opt_ip, }; @@ -237,6 +238,7 @@ static match_table_t arg_tokens = { {Opt_monport, "monport=%d"}, {Opt_port, "port=%d"}, {Opt_wsize, "wsize=%d"}, + {Opt_osdtimeout, "osdtimeout=%d"}, /* int args above */ {Opt_ip, "ip=%s"}, {-1, NULL} @@ -293,6 +295,7 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, /* defaults */ args->mntflags = flags; args->flags = 0; + args->osd_timeout = 5; /* seconds */ /* ip1[,ip2...]:/server/path */ c = strchr(dev_name, ':'); @@ -385,6 +388,9 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, case Opt_wsize: args->wsize = intval; break; + case Opt_osdtimeout: + args->osd_timeout = intval; + break; default: BUG_ON(token); diff --git a/src/kernel/super.h b/src/kernel/super.h index f8c046a1a772b..0ca0345681745 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -70,6 +70,7 @@ struct ceph_mount_args { struct ceph_entity_addr mon_addr[5]; char path[100]; int wsize; + int osd_timeout; }; /* diff --git a/src/messages/MOSDGetMap.h b/src/messages/MOSDGetMap.h index 2b439deeaf30c..8a955d76969fd 100644 --- a/src/messages/MOSDGetMap.h +++ b/src/messages/MOSDGetMap.h @@ -22,33 +22,29 @@ class MOSDGetMap : public Message { public: ceph_fsid fsid; - epoch_t start, want; + epoch_t start; // this is the first incremental the sender wants (he has start-1) MOSDGetMap() : Message(CEPH_MSG_OSD_GETMAP) {} - MOSDGetMap(ceph_fsid& f, epoch_t s=0, epoch_t w=0) : + MOSDGetMap(ceph_fsid& f, epoch_t s=0) : Message(CEPH_MSG_OSD_GETMAP), - fsid(f), start(s), want(w) { } + fsid(f), start(s) { } epoch_t get_start_epoch() { return start; } - epoch_t get_want_epoch() { return want; } const char *get_type_name() { return "get_osd_map"; } void print(ostream& out) { - out << "get_osd_map(have " << start; - if (want) out << " want " << want; + out << "get_osd_map(start " << start; out << ")"; } void encode_payload() { ::_encode(fsid, payload); ::_encode(start, payload); - ::_encode(want, payload); } void decode_payload() { int off = 0; ::_decode(fsid, payload, off); ::_decode(start, payload, off); - ::_decode(want, payload, off); } }; diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index ce48d6e7f9bdb..71c439453dee9 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -343,7 +343,9 @@ bool OSDMonitor::should_propose(double& delay) void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) { - dout(7) << "handle_osd_getmap from " << m->get_source() << " from " << m->get_start_epoch() << dendl; + dout(7) << "handle_osd_getmap from " << m->get_source() + << " start " << m->get_start_epoch() + << dendl; if (!ceph_fsid_equal(&m->fsid, &mon->monmap->fsid)) { dout(0) << "handle_osd_getmap on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl; @@ -351,11 +353,10 @@ void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) } if (m->get_start_epoch()) { - if (m->get_want_epoch() <= osdmap.get_epoch()) + if (m->get_start_epoch() <= osdmap.get_epoch()) send_incremental(m->get_source_inst(), m->get_start_epoch()); else - waiting_for_map[m->get_source_inst()] = pair(m->get_start_epoch(), - m->get_want_epoch()); + waiting_for_map[m->get_source_inst()] = m->get_start_epoch(); } else send_full(m->get_source_inst()); @@ -540,15 +541,14 @@ void OSDMonitor::send_to_waiting() { dout(10) << "send_to_waiting " << osdmap.get_epoch() << dendl; - map >::iterator i = waiting_for_map.begin(); + map::iterator i = waiting_for_map.begin(); while (i != waiting_for_map.end()) { - if (i->second.first) { - if (i->second.second <= osdmap.get_epoch()) - send_incremental(i->first, i->second.first); + if (i->second) { + if (i->second <= osdmap.get_epoch()) + send_incremental(i->first, i->second); else { dout(10) << "send_to_waiting skipping " << i->first - << " has " << i->second.first - << " wants " << i->second.second + << " wants " << i->second << dendl; i++; continue; @@ -570,7 +570,7 @@ void OSDMonitor::send_latest(entity_inst_t who, epoch_t start) send_incremental(who, start); } else { dout(5) << "send_latest to " << who << " later" << dendl; - waiting_for_map[who] = pair(start, 0); + waiting_for_map[who] = start; } } diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 8ed40db7e1516..4e2e06e4a8a8e 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -38,7 +38,7 @@ public: OSDMap osdmap; private: - map > waiting_for_map; // who -> (has, wants) + map waiting_for_map; // who -> start epoch // [leader] OSDMap::Incremental pending_inc; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index dc6005d587548..dbda9665eb1f4 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -146,7 +146,7 @@ void Objecter::maybe_request_map() dout(10) << "maybe_request_map requesting next osd map" << dendl; last_epoch_requested_stamp = now; last_epoch_requested = osdmap->get_epoch()+1; - messenger->send_message(new MOSDGetMap(monmap->fsid, osdmap->get_epoch(), last_epoch_requested), + messenger->send_message(new MOSDGetMap(monmap->fsid, last_epoch_requested), monmap->get_inst(monmap->pick_mon())); } -- 2.39.5