From c47c926cf090e38fdf1962ceafc5480e3d35e60e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 31 Aug 2009 16:08:48 -0700 Subject: [PATCH] kclient: subscribe to mdsmap --- src/kernel/debugfs.c | 4 +- src/kernel/mds_client.c | 7 -- src/kernel/mon_client.c | 209 ++++++++++++++++++++-------------------- src/kernel/mon_client.h | 12 +-- src/kernel/super.c | 2 + src/kernel/super.h | 2 +- 6 files changed, 115 insertions(+), 121 deletions(-) diff --git a/src/kernel/debugfs.c b/src/kernel/debugfs.c index e98dec88af476..9ee53bfdcf960 100644 --- a/src/kernel/debugfs.c +++ b/src/kernel/debugfs.c @@ -126,8 +126,8 @@ static int monc_show(struct seq_file *s, void *p) if (monc->want_osdmap) seq_printf(s, "want osdmap %u\n", (unsigned)monc->want_osdmap); - if (monc->want_mdsmap) - seq_printf(s, "want mdsmap %u\n", (unsigned)monc->want_mdsmap); + if (monc->have_mdsmap) + seq_printf(s, "have mdsmap %u\n", (unsigned)monc->have_mdsmap); while (nexttid < monc->last_tid) { got = radix_tree_gang_lookup(&monc->statfs_request_tree, diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index 62a2e381ed822..a38b6456751fe 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -1462,8 +1462,6 @@ static int __do_request(struct ceph_mds_client *mdsc, ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { dout("do_request no mds or not active, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); - ceph_monc_request_mdsmap(&mdsc->client->monc, - mdsc->mdsmap->m_epoch+1); goto out; } @@ -2526,7 +2524,6 @@ static void delayed_work(struct work_struct *work) container_of(work, struct ceph_mds_client, delayed_work.work); int renew_interval; int renew_caps; - u32 want_map = 0; dout("mdsc delayed_work\n"); ceph_check_delayed_caps(mdsc, 0); @@ -2555,7 +2552,6 @@ static void delayed_work(struct work_struct *work) pr_info("ceph mds%d session probably timed out," " requesting mds map\n", s->s_mds); } - want_map = mdsc->mdsmap->m_epoch + 1; } if (s->s_state < CEPH_MDS_SESSION_OPEN) { /* this mds is failed or recovering, just wait */ @@ -2578,9 +2574,6 @@ static void delayed_work(struct work_struct *work) } mutex_unlock(&mdsc->mutex); - if (want_map) - ceph_monc_request_mdsmap(&mdsc->client->monc, want_map); - schedule_delayed(mdsc); } diff --git a/src/kernel/mon_client.c b/src/kernel/mon_client.c index f134ad7e3871f..aaae7cdf6c62b 100644 --- a/src/kernel/mon_client.c +++ b/src/kernel/mon_client.c @@ -96,35 +96,83 @@ static int open_session(struct ceph_mon_client *monc, int newmon) { char r; - if (!newmon && monc->con) { - dout("open_session mon%d already open\n", monc->last_mon); - return 0; - } + if (!monc->con || newmon) { + if (monc->con) { + dout("open_session closing mon%d\n", monc->cur_mon); + monc->con->ops->put(monc->con); + } - if (monc->con) { - dout("open_session closing mon%d\n", monc->last_mon); - monc->con->ops->put(monc->con); - } + get_random_bytes(&r, 1); + monc->cur_mon = r % monc->monmap->num_mon; + monc->subscribed = false; + monc->sub_sent = 0; - get_random_bytes(&r, 1); - monc->last_mon = r % monc->monmap->num_mon; + monc->con = kzalloc(sizeof(*monc->con), GFP_NOFS); + if (!monc->con) { + pr_err("open_session mon%d ENOMEM\n", monc->cur_mon); + return -ENOMEM; + } - monc->con = kzalloc(sizeof(*monc->con), GFP_NOFS); - if (!monc->con) { - pr_err("open_session mon%d ENOMEM\n", monc->last_mon); - return -ENOMEM; + dout("open_session mon%d opened\n", monc->cur_mon); + ceph_con_init(monc->client->msgr, monc->con, + &monc->monmap->mon_inst[monc->cur_mon].addr); + monc->con->private = monc; + monc->con->ops = &mon_con_ops; + monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON); + monc->con->peer_name.num = cpu_to_le32(monc->cur_mon); + } else { + dout("open_session mon%d already open\n", monc->cur_mon); } - - dout("open_session mon%d opened\n", monc->last_mon); - ceph_con_init(monc->client->msgr, monc->con, - &monc->monmap->mon_inst[monc->last_mon].addr); - monc->con->private = monc; - monc->con->ops = &mon_con_ops; - monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON); - monc->con->peer_name.num = cpu_to_le32(monc->last_mon); return 0; } +static void send_subscribe(struct ceph_mon_client *monc) +{ + if (!monc->subscribed && !monc->sub_sent) { + struct ceph_msg *msg; + struct ceph_mon_subscribe_item *i; + void *p, *end; + + msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 64, 0, 0, 0); + if (!msg) + return; + + dout("open_session subscribing to 'mdsmap' at %u\n", + (unsigned)monc->have_mdsmap); + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + ceph_encode_32(&p, 1); + ceph_encode_string(&p, end, "mdsmap", 6); + i = p; + i->have = cpu_to_le64(monc->have_mdsmap); + i->onetime = 0; + p += sizeof(*i); + + msg->front.iov_len = p - msg->front.iov_base; + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + ceph_con_send(monc->con, msg); + + monc->sub_sent = jiffies; + } +} + +static void handle_subscribe_ack(struct ceph_mon_client *monc, + struct ceph_msg *msg) +{ + unsigned ms; + void *p = msg->front.iov_base; + void *end = p + msg->front.iov_len; + + dout("handle_subscribe_ack\n"); + ceph_decode_64_safe(&p, end, ms, bad); + monc->subscribed = true; + monc->sub_renew_after = monc->sub_sent + ms / 1000; + +bad: + pr_err("ceph got corrupt subscribe-ack msg\n"); +} + /* * Generic timeout mechanism for monitor requests, so we can resend if * we don't get a timely reply. Exponential backoff. @@ -173,62 +221,15 @@ static void init_request_type(struct ceph_mon_client *monc, } -/* - * Request a new mds map. - */ -static void request_mdsmap(struct ceph_mon_client *monc, int newmon) -{ - struct ceph_msg *msg; - struct ceph_mds_getmap *h; - int err; - - dout("request_mdsmap want %u\n", monc->want_mdsmap); - err = open_session(monc, newmon); - if (err) - return; - msg = ceph_msg_new(CEPH_MSG_MDS_GETMAP, sizeof(*h), 0, 0, NULL); - if (IS_ERR(msg)) - return; - h = msg->front.iov_base; - h->fsid = monc->monmap->fsid; - h->have_version = cpu_to_le64(monc->want_mdsmap - 1); - ceph_con_send(monc->con, msg); -} - -/* - * Register our desire for an mdsmap epoch >= @want. - */ -void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, u32 want) -{ - dout("request_mdsmap want %u\n", want); - mutex_lock(&monc->req_mutex); - if (want > monc->want_mdsmap) { - monc->want_mdsmap = want; - monc->mdsreq.delay = BASE_DELAY_INTERVAL; - request_mdsmap(monc, 0); - reschedule_timeout(&monc->mdsreq); - } - mutex_unlock(&monc->req_mutex); -} - /* * Possibly cancel our desire for a new map */ int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) { - int ret = 0; - mutex_lock(&monc->req_mutex); - if (got < monc->want_mdsmap) { - dout("got_mdsmap %u < wanted %u\n", got, monc->want_mdsmap); - ret = -EAGAIN; - } else { - dout("got_mdsmap %u >= wanted %u\n", got, monc->want_mdsmap); - monc->want_mdsmap = 0; - cancel_timeout(&monc->mdsreq); - } + monc->have_mdsmap = got; mutex_unlock(&monc->req_mutex); - return ret; + return 0; } @@ -330,7 +331,7 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg) struct ceph_entity_addr addr; int err = -EINVAL; - if (client->signed_ticket) { + if (client->whoami >= 0) { dout("handle_mount_ack - already mounted\n"); return; } @@ -382,6 +383,8 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg) le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid))); ceph_debugfs_client_init(client); + send_subscribe(monc); + err = 0; goto out; @@ -432,15 +435,14 @@ bad: * (re)send a statfs request */ static int send_statfs(struct ceph_mon_client *monc, - struct ceph_mon_statfs_request *req, - int newmon) + struct ceph_mon_statfs_request *req) { struct ceph_msg *msg; struct ceph_mon_statfs *h; int err; dout("send_statfs tid %llu\n", req->tid); - err = open_session(monc, newmon); + err = open_session(monc, 0); if (err) return err; msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL); @@ -476,23 +478,18 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) pr_err("ceph ENOMEM in do_statfs\n"); return -ENOMEM; } - if (monc->num_statfs_requests == 0) - schedule_delayed_work(&monc->statfs_delayed_work, - round_jiffies_relative(1*HZ)); monc->num_statfs_requests++; ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, 1); mutex_unlock(&monc->statfs_mutex); /* send request and wait */ - err = send_statfs(monc, &req, 0); + err = send_statfs(monc, &req); if (!err) err = wait_for_completion_interruptible(&req.completion); mutex_lock(&monc->statfs_mutex); radix_tree_delete(&monc->statfs_request_tree, req.tid); monc->num_statfs_requests--; - if (monc->num_statfs_requests == 0) - cancel_delayed_work(&monc->statfs_delayed_work); ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, -1); mutex_unlock(&monc->statfs_mutex); @@ -502,20 +499,15 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) } /* - * Resend any statfs requests that have timed out. + * Resend pending statfs requests. */ -static void do_statfs_check(struct work_struct *work) +static void resend_statfs(struct ceph_mon_client *monc) { - struct ceph_mon_client *monc = - container_of(work, struct ceph_mon_client, - statfs_delayed_work.work); u64 next_tid = 0; int got; int did = 0; - int newmon = 1; struct ceph_mon_statfs_request *req; - dout("do_statfs_check\n"); mutex_lock(&monc->statfs_mutex); while (1) { got = radix_tree_gang_lookup(&monc->statfs_request_tree, @@ -525,19 +517,10 @@ static void do_statfs_check(struct work_struct *work) break; did++; next_tid = req->tid + 1; - if (time_after(jiffies, req->last_attempt + req->delay)) { - req->last_attempt = jiffies; - if (req->delay < MAX_DELAY_INTERVAL) - req->delay *= 2; /* exponential backoff */ - send_statfs(monc, req, newmon); - newmon = 0; - } + + send_statfs(monc, req); } mutex_unlock(&monc->statfs_mutex); - - if (did) - schedule_delayed_work(&monc->statfs_delayed_work, - round_jiffies_relative(1*HZ)); } @@ -551,12 +534,10 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS); monc->num_statfs_requests = 0; monc->last_tid = 0; - INIT_DELAYED_WORK(&monc->statfs_delayed_work, do_statfs_check); - init_request_type(monc, &monc->mdsreq, request_mdsmap); init_request_type(monc, &monc->osdreq, request_osdmap); init_request_type(monc, &monc->mountreq, request_mount); mutex_init(&monc->req_mutex); - monc->want_mdsmap = 0; + monc->have_mdsmap = 0; monc->want_osdmap = 0; monc->con = NULL; return 0; @@ -565,10 +546,8 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) void ceph_monc_stop(struct ceph_mon_client *monc) { dout("stop\n"); - cancel_timeout(&monc->mdsreq); cancel_timeout(&monc->osdreq); cancel_timeout(&monc->mountreq); - cancel_delayed_work_sync(&monc->statfs_delayed_work); if (monc->con) { monc->con->ops->put(monc->con); monc->con = NULL; @@ -589,6 +568,11 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) case CEPH_MSG_CLIENT_MOUNT_ACK: handle_mount_ack(monc, msg); break; + + case CEPH_MSG_MON_SUBSCRIBE_ACK: + handle_subscribe_ack(monc, msg); + break; + case CEPH_MSG_STATFS_REPLY: handle_statfs_reply(monc, msg); break; @@ -608,10 +592,27 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_msg_put(msg); } +/* + * If the monitor connection resets, pick a new monitor and resubmit + * any pending requests. + */ +static void mon_reset(struct ceph_connection *con) +{ + struct ceph_mon_client *monc = con->private; + + dout("mon_reset\n"); + if (open_session(monc, 1) < 0) + return; /* delayed work handler will retry */ + + resend_statfs(monc); +} + + const static struct ceph_connection_operations mon_con_ops = { .get = ceph_con_get, .put = ceph_con_put, .dispatch = dispatch, + .peer_reset = mon_reset, .alloc_msg = ceph_alloc_msg, .alloc_middle = ceph_alloc_middle, }; diff --git a/src/kernel/mon_client.h b/src/kernel/mon_client.h index 5e9f5444cb6aa..c1ed012ada2de 100644 --- a/src/kernel/mon_client.h +++ b/src/kernel/mon_client.h @@ -51,9 +51,11 @@ struct ceph_mon_statfs_request { struct ceph_mon_client { struct ceph_client *client; - int last_mon; /* last monitor i contacted */ struct ceph_monmap *monmap; + int cur_mon; /* last monitor i contacted */ + bool subscribed; + unsigned long sub_sent, sub_renew_after; struct ceph_connection *con; /* pending statfs requests */ @@ -61,12 +63,11 @@ struct ceph_mon_client { struct radix_tree_root statfs_request_tree; int num_statfs_requests; u64 last_tid; - struct delayed_work statfs_delayed_work; /* mds/osd map or mount requests */ struct mutex req_mutex; - struct ceph_mon_request mdsreq, osdreq, mountreq; - u32 want_mdsmap; + struct ceph_mon_request osdreq, mountreq; + u32 have_mdsmap; u32 want_osdmap; struct dentry *debugfs_file; @@ -85,15 +86,12 @@ extern void ceph_monc_stop(struct ceph_mon_client *monc); * periodically rerequest the map from the monitor cluster until we * get what we want. */ -extern void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, u32 want); extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want); extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); extern void ceph_monc_request_mount(struct ceph_mon_client *monc); -extern void ceph_monc_request_umount(struct ceph_mon_client *monc); -extern void ceph_monc_request_umount(struct ceph_mon_client *monc); extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf); diff --git a/src/kernel/super.c b/src/kernel/super.c index b9c3948713be8..6034029784f79 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -252,6 +252,8 @@ const char *ceph_msg_type_name(int type) case CEPH_MSG_PING: return "ping"; case CEPH_MSG_MON_MAP: return "mon_map"; case CEPH_MSG_MON_GET_MAP: return "mon_get_map"; + case CEPH_MSG_MON_SUBSCRIBE: return "mon_subscribe"; + case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack"; case CEPH_MSG_CLIENT_MOUNT: return "client_mount"; case CEPH_MSG_CLIENT_MOUNT_ACK: return "client_mount_ack"; case CEPH_MSG_STATFS: return "statfs"; diff --git a/src/kernel/super.h b/src/kernel/super.h index 8a12f9eb275ec..cc54237e98cdd 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -110,7 +110,7 @@ static inline unsigned long time_sub(unsigned long a, unsigned long b) * mounting the same ceph filesystem/cluster. */ struct ceph_client { - u32 whoami; /* my client number */ + int whoami; /* my client number */ struct dentry *debugfs_fsid, *debugfs_monmap; struct dentry *debugfs_mdsmap, *debugfs_osdmap; struct dentry *debugfs_dir, *debugfs_dentry_lru; -- 2.39.5