From ee2175fdb5e429cba077da4b9756ae53089cc10a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 1 Sep 2009 11:04:03 -0700 Subject: [PATCH] kclient: use subscribe to get mdsmap, osdmap updates --- src/kernel/debugfs.c | 10 +- src/kernel/mon_client.c | 247 ++++++++++++++++++---------------------- src/kernel/mon_client.h | 14 +-- src/kernel/osd_client.c | 8 +- src/kernel/super.c | 6 +- 5 files changed, 126 insertions(+), 159 deletions(-) diff --git a/src/kernel/debugfs.c b/src/kernel/debugfs.c index 9ee53bfdcf960..5de58fef5d676 100644 --- a/src/kernel/debugfs.c +++ b/src/kernel/debugfs.c @@ -122,12 +122,14 @@ static int monc_show(struct seq_file *s, void *p) int got; struct ceph_mon_client *monc = &client->monc; - mutex_lock(&monc->statfs_mutex); + mutex_lock(&monc->mutex); - if (monc->want_osdmap) - seq_printf(s, "want osdmap %u\n", (unsigned)monc->want_osdmap); if (monc->have_mdsmap) seq_printf(s, "have mdsmap %u\n", (unsigned)monc->have_mdsmap); + if (monc->have_osdmap) + seq_printf(s, "have osdmap %u\n", (unsigned)monc->have_osdmap); + if (monc->want_next_osdmap) + seq_printf(s, "want next osdmap\n"); while (nexttid < monc->last_tid) { got = radix_tree_gang_lookup(&monc->statfs_request_tree, @@ -138,7 +140,7 @@ static int monc_show(struct seq_file *s, void *p) seq_printf(s, "%lld statfs\n", req->tid); } - mutex_unlock(&monc->statfs_mutex); + mutex_unlock(&monc->mutex); return 0; } diff --git a/src/kernel/mon_client.c b/src/kernel/mon_client.c index aaae7cdf6c62b..f86c85412ab9f 100644 --- a/src/kernel/mon_client.c +++ b/src/kernel/mon_client.c @@ -92,7 +92,7 @@ int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) /* * Open a session with a (new) monitor. */ -static int open_session(struct ceph_mon_client *monc, int newmon) +static int __open_session(struct ceph_mon_client *monc, int newmon) { char r; @@ -126,9 +126,29 @@ static int open_session(struct ceph_mon_client *monc, int newmon) return 0; } -static void send_subscribe(struct ceph_mon_client *monc) +/* + * Reschedule delayed work timer. + */ +static void __schedule_delayed(struct ceph_mon_client *monc) +{ + unsigned delay; + + if (monc->want_mount) + delay = 10 * HZ; + else + delay = 20 * HZ; + dout("__schedule_delayed after %u\n", delay); + schedule_delayed_work(&monc->delayed_work, delay); +} + +/* + * Send subscribe request for mdsmap and/or osdmap. + */ +static void __send_subscribe(struct ceph_mon_client *monc) { - if (!monc->subscribed && !monc->sub_sent) { + if ((!monc->subscribed && !monc->sub_sent) || + time_before(jiffies, monc->sub_renew_after) || + !monc->want_next_osdmap) { struct ceph_msg *msg; struct ceph_mon_subscribe_item *i; void *p, *end; @@ -142,12 +162,19 @@ static void send_subscribe(struct ceph_mon_client *monc) p = msg->front.iov_base; end = p + msg->front.iov_len; - ceph_encode_32(&p, 1); + ceph_encode_32(&p, 1 + monc->want_next_osdmap); ceph_encode_string(&p, end, "mdsmap", 6); i = p; i->have = cpu_to_le64(monc->have_mdsmap); i->onetime = 0; p += sizeof(*i); + if (monc->want_next_osdmap) { + ceph_encode_string(&p, end, "osdmap", 6); + i = p; + i->have = cpu_to_le64(monc->have_osdmap); + i->onetime = 1; + p += sizeof(*i); + } msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); @@ -160,143 +187,63 @@ static void send_subscribe(struct ceph_mon_client *monc) static void handle_subscribe_ack(struct ceph_mon_client *monc, struct ceph_msg *msg) { - unsigned ms; + unsigned seconds; void *p = msg->front.iov_base; void *end = p + msg->front.iov_len; dout("handle_subscribe_ack\n"); - ceph_decode_64_safe(&p, end, ms, bad); + ceph_decode_64_safe(&p, end, seconds, bad); monc->subscribed = true; - monc->sub_renew_after = monc->sub_sent + ms / 1000; + monc->sub_renew_after = monc->sub_sent + seconds*HZ; bad: pr_err("ceph got corrupt subscribe-ack msg\n"); } /* - * Generic timeout mechanism for monitor requests, so we can resend if - * we don't get a timely reply. Exponential backoff. - */ -static void reschedule_timeout(struct ceph_mon_request *req) -{ - schedule_delayed_work(&req->delayed_work, req->delay); - if (req->delay < MAX_DELAY_INTERVAL) - req->delay *= 2; - else - req->delay = MAX_DELAY_INTERVAL; -} - -static void retry_request(struct work_struct *work) -{ - struct ceph_mon_request *req = - container_of(work, struct ceph_mon_request, - delayed_work.work); - - /* - * if lock is contended, reschedule sooner. we can't wait for - * mutex because we cancel the timeout sync with lock held. - */ - if (mutex_trylock(&req->monc->req_mutex)) { - req->do_request(req->monc, 1); - reschedule_timeout(req); - mutex_unlock(&req->monc->req_mutex); - } else - schedule_delayed_work(&req->delayed_work, BASE_DELAY_INTERVAL); -} - -static void cancel_timeout(struct ceph_mon_request *req) -{ - cancel_delayed_work_sync(&req->delayed_work); - req->delay = BASE_DELAY_INTERVAL; -} - -static void init_request_type(struct ceph_mon_client *monc, - struct ceph_mon_request *req, - ceph_monc_request_func_t func) -{ - req->monc = monc; - INIT_DELAYED_WORK(&req->delayed_work, retry_request); - req->delay = 0; - req->do_request = func; -} - - -/* - * Possibly cancel our desire for a new map + * Keep track of which maps we have */ int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) { - mutex_lock(&monc->req_mutex); + mutex_lock(&monc->mutex); monc->have_mdsmap = got; - mutex_unlock(&monc->req_mutex); + mutex_unlock(&monc->mutex); return 0; } - -/* - * osd map - */ -static void request_osdmap(struct ceph_mon_client *monc, int newmon) -{ - struct ceph_msg *msg; - struct ceph_osd_getmap *h; - int err; - - dout("request_osdmap want %u\n", monc->want_osdmap); - err = open_session(monc, newmon); - if (err) - return; - msg = ceph_msg_new(CEPH_MSG_OSD_GETMAP, sizeof(*h), 0, 0, NULL); - if (IS_ERR(msg)) - return; - h = msg->front.iov_base; - h->fsid = monc->monmap->fsid; - h->start = cpu_to_le32(monc->want_osdmap); - h->have_version = cpu_to_le64(monc->want_osdmap ? - monc->want_osdmap-1 : 0); - ceph_con_send(monc->con, msg); -} - -void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want) +int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) { - dout("request_osdmap want %u\n", want); - mutex_lock(&monc->req_mutex); - monc->osdreq.delay = BASE_DELAY_INTERVAL; - monc->want_osdmap = want; - request_osdmap(monc, 0); - reschedule_timeout(&monc->osdreq); - mutex_unlock(&monc->req_mutex); + mutex_lock(&monc->mutex); + monc->have_osdmap = got; + monc->want_next_osdmap = false; + mutex_unlock(&monc->mutex); + return 0; } -int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) +/* + * Register interest in the next osdmap + */ +void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) { - int ret = 0; - - mutex_lock(&monc->req_mutex); - if (got < monc->want_osdmap) { - dout("got_osdmap %u < wanted %u\n", got, monc->want_osdmap); - ret = -EAGAIN; - } else { - dout("got_osdmap %u >= wanted %u\n", got, monc->want_osdmap); - monc->want_osdmap = 0; - cancel_timeout(&monc->osdreq); - } - mutex_unlock(&monc->req_mutex); - return ret; + dout("request_next_osdmap have %u\n", monc->have_osdmap); + mutex_lock(&monc->mutex); + monc->want_next_osdmap = true; + __send_subscribe(monc); + mutex_unlock(&monc->mutex); } /* * mount */ -static void request_mount(struct ceph_mon_client *monc, int newmon) +static void __request_mount(struct ceph_mon_client *monc) { struct ceph_msg *msg; struct ceph_client_mount *h; int err; - dout("request_mount\n"); - err = open_session(monc, newmon); + dout("__request_mount\n"); + err = __open_session(monc, 1); if (err) return; msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL); @@ -309,11 +256,10 @@ static void request_mount(struct ceph_mon_client *monc, int newmon) void ceph_monc_request_mount(struct ceph_mon_client *monc) { - mutex_lock(&monc->req_mutex); - monc->mountreq.delay = BASE_DELAY_INTERVAL; - request_mount(monc, 0); - reschedule_timeout(&monc->mountreq); - mutex_unlock(&monc->req_mutex); + mutex_lock(&monc->mutex); + __request_mount(monc); + __schedule_delayed(monc); + mutex_unlock(&monc->mutex); } /* @@ -336,6 +282,8 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg) return; } + mutex_lock(&monc->mutex); + dout("handle_mount_ack\n"); p = msg->front.iov_base; end = p + msg->front.iov_len; @@ -371,19 +319,19 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg) client->signed_ticket = NULL; client->signed_ticket_len = 0; + monc->want_mount = false; + client->whoami = cnum; client->msgr->inst.name.num = cpu_to_le32(cnum); client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; - memcpy(&client->msgr->inst.addr, &addr, sizeof(addr)); - pr_info("ceph client%d %u.%u.%u.%u:%u fsid %llx.%llx\n", client->whoami, IPQUADPORT(addr.ipaddr), le64_to_cpu(__ceph_fsid_major(&client->monc.monmap->fsid)), le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid))); - ceph_debugfs_client_init(client); - send_subscribe(monc); + ceph_debugfs_client_init(client); + __send_subscribe(monc); err = 0; goto out; @@ -392,9 +340,7 @@ bad: pr_err("ceph error decoding mount_ack message\n"); out: client->mount_err = err; - mutex_lock(&monc->req_mutex); - cancel_timeout(&monc->mountreq); - mutex_unlock(&monc->req_mutex); + mutex_unlock(&monc->mutex); wake_up(&client->mount_wq); } @@ -416,13 +362,13 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, tid = le64_to_cpu(reply->tid); dout("handle_statfs_reply %p tid %llu\n", msg, tid); - mutex_lock(&monc->statfs_mutex); + mutex_lock(&monc->mutex); req = radix_tree_lookup(&monc->statfs_request_tree, tid); if (req) { *req->buf = reply->st; req->result = 0; } - mutex_unlock(&monc->statfs_mutex); + mutex_unlock(&monc->mutex); if (req) complete(&req->completion); return; @@ -442,7 +388,7 @@ static int send_statfs(struct ceph_mon_client *monc, int err; dout("send_statfs tid %llu\n", req->tid); - err = open_session(monc, 0); + err = __open_session(monc, 0); if (err) return err; msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL); @@ -469,29 +415,29 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) init_completion(&req.completion); /* register request */ - mutex_lock(&monc->statfs_mutex); + mutex_lock(&monc->mutex); req.tid = ++monc->last_tid; req.last_attempt = jiffies; req.delay = BASE_DELAY_INTERVAL; if (radix_tree_insert(&monc->statfs_request_tree, req.tid, &req) < 0) { - mutex_unlock(&monc->statfs_mutex); + mutex_unlock(&monc->mutex); pr_err("ceph ENOMEM in do_statfs\n"); return -ENOMEM; } monc->num_statfs_requests++; ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, 1); - mutex_unlock(&monc->statfs_mutex); + mutex_unlock(&monc->mutex); /* send request and wait */ err = send_statfs(monc, &req); if (!err) err = wait_for_completion_interruptible(&req.completion); - mutex_lock(&monc->statfs_mutex); + mutex_lock(&monc->mutex); radix_tree_delete(&monc->statfs_request_tree, req.tid); monc->num_statfs_requests--; ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, -1); - mutex_unlock(&monc->statfs_mutex); + mutex_unlock(&monc->mutex); if (!err) err = req.result; @@ -508,7 +454,7 @@ static void resend_statfs(struct ceph_mon_client *monc) int did = 0; struct ceph_mon_statfs_request *req; - mutex_lock(&monc->statfs_mutex); + mutex_lock(&monc->mutex); while (1) { got = radix_tree_gang_lookup(&monc->statfs_request_tree, (void **)&req, @@ -520,9 +466,30 @@ static void resend_statfs(struct ceph_mon_client *monc) send_statfs(monc, req); } - mutex_unlock(&monc->statfs_mutex); + mutex_unlock(&monc->mutex); } +/* + * Delayed work. If we haven't mounted yet, retry. Otherwise, + * renew/retry subscription as needed (in case it is timing out, or we + * got an ENOMEM). And keep the monitor connection alive. + */ +static void delayed_work(struct work_struct *work) +{ + struct ceph_mon_client *monc = + container_of(work, struct ceph_mon_client, delayed_work.work); + + dout("monc delayed_work\n"); + mutex_lock(&monc->mutex); + if (monc->want_mount) { + __request_mount(monc); + } else { + ceph_con_keepalive(monc->con); + __send_subscribe(monc); + } + __schedule_delayed(monc); + mutex_unlock(&monc->mutex); +} int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) { @@ -530,24 +497,25 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) memset(monc, 0, sizeof(*monc)); monc->client = cl; monc->monmap = NULL; - mutex_init(&monc->statfs_mutex); + mutex_init(&monc->mutex); + monc->con = NULL; + + INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS); monc->num_statfs_requests = 0; monc->last_tid = 0; - init_request_type(monc, &monc->osdreq, request_osdmap); - init_request_type(monc, &monc->mountreq, request_mount); - mutex_init(&monc->req_mutex); + monc->have_mdsmap = 0; - monc->want_osdmap = 0; - monc->con = NULL; + monc->have_osdmap = 0; + monc->want_next_osdmap = true; + monc->want_mount = true; return 0; } void ceph_monc_stop(struct ceph_mon_client *monc) { dout("stop\n"); - cancel_timeout(&monc->osdreq); - cancel_timeout(&monc->mountreq); + cancel_delayed_work_sync(&monc->delayed_work); if (monc->con) { monc->con->ops->put(monc->con); monc->con = NULL; @@ -601,9 +569,10 @@ static void mon_reset(struct ceph_connection *con) struct ceph_mon_client *monc = con->private; dout("mon_reset\n"); - if (open_session(monc, 1) < 0) + if (__open_session(monc, 1) < 0) return; /* delayed work handler will retry */ + __send_subscribe(monc); resend_statfs(monc); } diff --git a/src/kernel/mon_client.h b/src/kernel/mon_client.h index c1ed012ada2de..a49a5d59ef0bf 100644 --- a/src/kernel/mon_client.h +++ b/src/kernel/mon_client.h @@ -53,22 +53,22 @@ struct ceph_mon_client { struct ceph_client *client; struct ceph_monmap *monmap; + struct mutex mutex; + struct delayed_work delayed_work; + int cur_mon; /* last monitor i contacted */ bool subscribed; unsigned long sub_sent, sub_renew_after; struct ceph_connection *con; /* pending statfs requests */ - struct mutex statfs_mutex; struct radix_tree_root statfs_request_tree; int num_statfs_requests; u64 last_tid; /* mds/osd map or mount requests */ - struct mutex req_mutex; - struct ceph_mon_request osdreq, mountreq; - u32 have_mdsmap; - u32 want_osdmap; + bool want_mount, want_next_osdmap; + u32 have_osdmap, have_mdsmap; struct dentry *debugfs_file; }; @@ -87,10 +87,10 @@ extern void ceph_monc_stop(struct ceph_mon_client *monc); * get what we want. */ extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); - -extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want); extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); +extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); + extern void ceph_monc_request_mount(struct ceph_mon_client *monc); extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 12e0d8de46a08..cb77848c7064c 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -523,8 +523,7 @@ static int send_request(struct ceph_osd_client *osdc, return err; if (req->r_osd == NULL) { dout("send_request %p no up osds in pg\n", req); - ceph_monc_request_osdmap(&osdc->client->monc, - osdc->osdmap->epoch+1); + ceph_monc_request_next_osdmap(&osdc->client->monc); return 0; } @@ -565,7 +564,7 @@ static void handle_timeout(struct work_struct *work) dout("timeout\n"); down_read(&osdc->map_sem); - ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1); + ceph_monc_request_next_osdmap(&osdc->client->monc); mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { @@ -770,8 +769,7 @@ static void kick_requests(struct ceph_osd_client *osdc, if (needmap) { dout("%d requests for down osds, need new map\n", needmap); - ceph_monc_request_osdmap(&osdc->client->monc, - osdc->osdmap->epoch+1); + ceph_monc_request_next_osdmap(&osdc->client->monc); } } diff --git a/src/kernel/super.c b/src/kernel/super.c index 6034029784f79..852e67c108f5f 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -727,7 +727,6 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, { struct ceph_entity_addr *myaddr = NULL; int err; - int request_interval = 5 * HZ; unsigned long timeout = client->mount_args.mount_timeout * HZ; unsigned long started = jiffies; /* note the start time */ struct dentry *root; @@ -756,10 +755,10 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, goto out; /* wait */ - dout("mount waiting for niynt\n"); + dout("mount waiting for mount\n"); err = wait_event_interruptible_timeout(client->mount_wq, client->mount_err || have_mon_map(client), - request_interval); + timeout); if (err == -EINTR || err == -ERESTARTSYS) goto out; if (client->mount_err) { @@ -768,7 +767,6 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, } } - dout("mount opening root\n"); root = open_root_dentry(client, "", started); if (IS_ERR(root)) { -- 2.39.5