]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: subscribe to mdsmap
authorSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 23:08:48 +0000 (16:08 -0700)
committerSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 23:08:48 +0000 (16:08 -0700)
src/kernel/debugfs.c
src/kernel/mds_client.c
src/kernel/mon_client.c
src/kernel/mon_client.h
src/kernel/super.c
src/kernel/super.h

index e98dec88af476e238974b2250e4875c1799d0c3c..9ee53bfdcf96057f3f2a8d2e82c9754d4f0d6c72 100644 (file)
@@ -126,8 +126,8 @@ static int monc_show(struct seq_file *s, void *p)
 
        if (monc->want_osdmap)
                seq_printf(s, "want osdmap %u\n", (unsigned)monc->want_osdmap);
-       if (monc->want_mdsmap)
-               seq_printf(s, "want mdsmap %u\n", (unsigned)monc->want_mdsmap);
+       if (monc->have_mdsmap)
+               seq_printf(s, "have mdsmap %u\n", (unsigned)monc->have_mdsmap);
 
        while (nexttid < monc->last_tid) {
                got = radix_tree_gang_lookup(&monc->statfs_request_tree,
index 62a2e381ed822af70ce8f61803cb178bb4cc0f32..a38b6456751fe330c91ec27e549e96e3597d87cd 100644 (file)
@@ -1462,8 +1462,6 @@ static int __do_request(struct ceph_mds_client *mdsc,
            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
                dout("do_request no mds or not active, waiting for map\n");
                list_add(&req->r_wait, &mdsc->waiting_for_map);
-               ceph_monc_request_mdsmap(&mdsc->client->monc,
-                                        mdsc->mdsmap->m_epoch+1);
                goto out;
        }
 
@@ -2526,7 +2524,6 @@ static void delayed_work(struct work_struct *work)
                container_of(work, struct ceph_mds_client, delayed_work.work);
        int renew_interval;
        int renew_caps;
-       u32 want_map = 0;
 
        dout("mdsc delayed_work\n");
        ceph_check_delayed_caps(mdsc, 0);
@@ -2555,7 +2552,6 @@ static void delayed_work(struct work_struct *work)
                                pr_info("ceph mds%d session probably timed out,"
                                        " requesting mds map\n", s->s_mds);
                        }
-                       want_map = mdsc->mdsmap->m_epoch + 1;
                }
                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
                        /* this mds is failed or recovering, just wait */
@@ -2578,9 +2574,6 @@ static void delayed_work(struct work_struct *work)
        }
        mutex_unlock(&mdsc->mutex);
 
-       if (want_map)
-               ceph_monc_request_mdsmap(&mdsc->client->monc, want_map);
-
        schedule_delayed(mdsc);
 }
 
index f134ad7e3871fcc0671aac997fbc0c26e3b582ac..aaae7cdf6c62b9a61f5ab6655db0c8f90dbe7918 100644 (file)
@@ -96,35 +96,83 @@ static int open_session(struct ceph_mon_client *monc, int newmon)
 {
        char r;
 
-       if (!newmon && monc->con) {
-               dout("open_session mon%d already open\n", monc->last_mon);
-               return 0;
-       }
+       if (!monc->con || newmon) {
+               if (monc->con) {
+                       dout("open_session closing mon%d\n", monc->cur_mon);
+                       monc->con->ops->put(monc->con);
+               }
 
-       if (monc->con) {
-               dout("open_session closing mon%d\n", monc->last_mon);
-               monc->con->ops->put(monc->con);
-       }
+               get_random_bytes(&r, 1);
+               monc->cur_mon = r % monc->monmap->num_mon;
+               monc->subscribed = false;
+               monc->sub_sent = 0;
 
-       get_random_bytes(&r, 1);
-       monc->last_mon = r % monc->monmap->num_mon;
+               monc->con = kzalloc(sizeof(*monc->con), GFP_NOFS);
+               if (!monc->con) {
+                       pr_err("open_session mon%d ENOMEM\n", monc->cur_mon);
+                       return -ENOMEM;
+               }
 
-       monc->con = kzalloc(sizeof(*monc->con), GFP_NOFS);
-       if (!monc->con) {
-               pr_err("open_session mon%d ENOMEM\n", monc->last_mon);
-               return -ENOMEM;
+               dout("open_session mon%d opened\n", monc->cur_mon);
+               ceph_con_init(monc->client->msgr, monc->con,
+                             &monc->monmap->mon_inst[monc->cur_mon].addr);
+               monc->con->private = monc;
+               monc->con->ops = &mon_con_ops;
+               monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON);
+               monc->con->peer_name.num = cpu_to_le32(monc->cur_mon);
+       } else {
+               dout("open_session mon%d already open\n", monc->cur_mon);
        }
-
-       dout("open_session mon%d opened\n", monc->last_mon);
-       ceph_con_init(monc->client->msgr, monc->con,
-                     &monc->monmap->mon_inst[monc->last_mon].addr);
-       monc->con->private = monc;
-       monc->con->ops = &mon_con_ops;
-       monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON);
-       monc->con->peer_name.num = cpu_to_le32(monc->last_mon);
        return 0;
 }
 
+static void send_subscribe(struct ceph_mon_client *monc)
+{
+       if (!monc->subscribed && !monc->sub_sent) {
+               struct ceph_msg *msg;
+               struct ceph_mon_subscribe_item *i;
+               void *p, *end;
+
+               msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 64, 0, 0, 0);
+               if (!msg)
+                       return;
+
+               dout("open_session subscribing to 'mdsmap' at %u\n",
+                    (unsigned)monc->have_mdsmap);
+               p = msg->front.iov_base;
+               end = p + msg->front.iov_len;
+
+               ceph_encode_32(&p, 1);
+               ceph_encode_string(&p, end, "mdsmap", 6);
+               i = p;
+               i->have = cpu_to_le64(monc->have_mdsmap);
+               i->onetime = 0;
+               p += sizeof(*i);
+
+               msg->front.iov_len = p - msg->front.iov_base;
+               msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+               ceph_con_send(monc->con, msg);
+
+               monc->sub_sent = jiffies;
+       }
+}
+
+static void handle_subscribe_ack(struct ceph_mon_client *monc,
+                                struct ceph_msg *msg)
+{
+       unsigned ms;
+       void *p = msg->front.iov_base;
+       void *end = p + msg->front.iov_len;
+
+       dout("handle_subscribe_ack\n");
+       ceph_decode_64_safe(&p, end, ms, bad);
+       monc->subscribed = true;
+       monc->sub_renew_after = monc->sub_sent + ms / 1000;
+       
+bad:
+       pr_err("ceph got corrupt subscribe-ack msg\n");
+}
+
 /*
  * Generic timeout mechanism for monitor requests, so we can resend if
  * we don't get a timely reply.  Exponential backoff.
@@ -173,62 +221,15 @@ static void init_request_type(struct ceph_mon_client *monc,
 }
 
 
-/*
- * Request a new mds map.
- */
-static void request_mdsmap(struct ceph_mon_client *monc, int newmon)
-{
-       struct ceph_msg *msg;
-       struct ceph_mds_getmap *h;
-       int err;
-
-       dout("request_mdsmap want %u\n", monc->want_mdsmap);
-       err = open_session(monc, newmon);
-       if (err)
-               return;
-       msg = ceph_msg_new(CEPH_MSG_MDS_GETMAP, sizeof(*h), 0, 0, NULL);
-       if (IS_ERR(msg))
-               return;
-       h = msg->front.iov_base;
-       h->fsid = monc->monmap->fsid;
-       h->have_version = cpu_to_le64(monc->want_mdsmap - 1);
-       ceph_con_send(monc->con, msg);
-}
-
-/*
- * Register our desire for an mdsmap epoch >= @want.
- */
-void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, u32 want)
-{
-       dout("request_mdsmap want %u\n", want);
-       mutex_lock(&monc->req_mutex);
-       if (want > monc->want_mdsmap) {
-               monc->want_mdsmap = want;
-               monc->mdsreq.delay = BASE_DELAY_INTERVAL;
-               request_mdsmap(monc, 0);
-               reschedule_timeout(&monc->mdsreq);
-       }
-       mutex_unlock(&monc->req_mutex);
-}
-
 /*
  * Possibly cancel our desire for a new map
  */
 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
 {
-       int ret = 0;
-
        mutex_lock(&monc->req_mutex);
-       if (got < monc->want_mdsmap) {
-               dout("got_mdsmap %u < wanted %u\n", got, monc->want_mdsmap);
-               ret = -EAGAIN;
-       } else {
-               dout("got_mdsmap %u >= wanted %u\n", got, monc->want_mdsmap);
-               monc->want_mdsmap = 0;
-               cancel_timeout(&monc->mdsreq);
-       }
+       monc->have_mdsmap = got;
        mutex_unlock(&monc->req_mutex);
-       return ret;
+       return 0;
 }
 
 
@@ -330,7 +331,7 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
        struct ceph_entity_addr addr;
        int err = -EINVAL;
 
-       if (client->signed_ticket) {
+       if (client->whoami >= 0) {
                dout("handle_mount_ack - already mounted\n");
                return;
        }
@@ -382,6 +383,8 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
                le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid)));
        ceph_debugfs_client_init(client);
 
+       send_subscribe(monc);
+
        err = 0;
        goto out;
 
@@ -432,15 +435,14 @@ bad:
  * (re)send a statfs request
  */
 static int send_statfs(struct ceph_mon_client *monc,
-                      struct ceph_mon_statfs_request *req,
-                      int newmon)
+                      struct ceph_mon_statfs_request *req)
 {
        struct ceph_msg *msg;
        struct ceph_mon_statfs *h;
        int err;
 
        dout("send_statfs tid %llu\n", req->tid);
-       err = open_session(monc, newmon);
+       err = open_session(monc, 0);
        if (err)
                return err;
        msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
@@ -476,23 +478,18 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
                pr_err("ceph ENOMEM in do_statfs\n");
                return -ENOMEM;
        }
-       if (monc->num_statfs_requests == 0)
-               schedule_delayed_work(&monc->statfs_delayed_work,
-                                     round_jiffies_relative(1*HZ));
        monc->num_statfs_requests++;
        ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, 1);
        mutex_unlock(&monc->statfs_mutex);
 
        /* send request and wait */
-       err = send_statfs(monc, &req, 0);
+       err = send_statfs(monc, &req);
        if (!err)
                err = wait_for_completion_interruptible(&req.completion);
 
        mutex_lock(&monc->statfs_mutex);
        radix_tree_delete(&monc->statfs_request_tree, req.tid);
        monc->num_statfs_requests--;
-       if (monc->num_statfs_requests == 0)
-               cancel_delayed_work(&monc->statfs_delayed_work);
        ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, -1);
        mutex_unlock(&monc->statfs_mutex);
 
@@ -502,20 +499,15 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 }
 
 /*
- * Resend any statfs requests that have timed out.
+ * Resend pending statfs requests.
  */
-static void do_statfs_check(struct work_struct *work)
+static void resend_statfs(struct ceph_mon_client *monc)
 {
-       struct ceph_mon_client *monc =
-               container_of(work, struct ceph_mon_client,
-                            statfs_delayed_work.work);
        u64 next_tid = 0;
        int got;
        int did = 0;
-       int newmon = 1;
        struct ceph_mon_statfs_request *req;
 
-       dout("do_statfs_check\n");
        mutex_lock(&monc->statfs_mutex);
        while (1) {
                got = radix_tree_gang_lookup(&monc->statfs_request_tree,
@@ -525,19 +517,10 @@ static void do_statfs_check(struct work_struct *work)
                        break;
                did++;
                next_tid = req->tid + 1;
-               if (time_after(jiffies, req->last_attempt + req->delay)) {
-                       req->last_attempt = jiffies;
-                       if (req->delay < MAX_DELAY_INTERVAL)
-                               req->delay *= 2; /* exponential backoff */
-                       send_statfs(monc, req, newmon);
-                       newmon = 0;
-               }
+
+               send_statfs(monc, req);
        }
        mutex_unlock(&monc->statfs_mutex);
-
-       if (did)
-               schedule_delayed_work(&monc->statfs_delayed_work,
-                                     round_jiffies_relative(1*HZ));
 }
 
 
@@ -551,12 +534,10 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
        monc->num_statfs_requests = 0;
        monc->last_tid = 0;
-       INIT_DELAYED_WORK(&monc->statfs_delayed_work, do_statfs_check);
-       init_request_type(monc, &monc->mdsreq, request_mdsmap);
        init_request_type(monc, &monc->osdreq, request_osdmap);
        init_request_type(monc, &monc->mountreq, request_mount);
        mutex_init(&monc->req_mutex);
-       monc->want_mdsmap = 0;
+       monc->have_mdsmap = 0;
        monc->want_osdmap = 0;
        monc->con = NULL;
        return 0;
@@ -565,10 +546,8 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 void ceph_monc_stop(struct ceph_mon_client *monc)
 {
        dout("stop\n");
-       cancel_timeout(&monc->mdsreq);
        cancel_timeout(&monc->osdreq);
        cancel_timeout(&monc->mountreq);
-       cancel_delayed_work_sync(&monc->statfs_delayed_work);
        if (monc->con) {
                monc->con->ops->put(monc->con);
                monc->con = NULL;
@@ -589,6 +568,11 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        case CEPH_MSG_CLIENT_MOUNT_ACK:
                handle_mount_ack(monc, msg);
                break;
+
+       case CEPH_MSG_MON_SUBSCRIBE_ACK:
+               handle_subscribe_ack(monc, msg);
+               break;
+
        case CEPH_MSG_STATFS_REPLY:
                handle_statfs_reply(monc, msg);
                break;
@@ -608,10 +592,27 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        ceph_msg_put(msg);
 }
 
+/*
+ * If the monitor connection resets, pick a new monitor and resubmit
+ * any pending requests.
+ */
+static void mon_reset(struct ceph_connection *con)
+{
+       struct ceph_mon_client *monc = con->private;
+
+       dout("mon_reset\n");
+       if (open_session(monc, 1) < 0)
+               return;   /* delayed work handler will retry */
+
+       resend_statfs(monc);
+}
+
+
 const static struct ceph_connection_operations mon_con_ops = {
        .get = ceph_con_get,
        .put = ceph_con_put,
        .dispatch = dispatch,
+       .peer_reset = mon_reset,
        .alloc_msg = ceph_alloc_msg,
        .alloc_middle = ceph_alloc_middle,
 };
index 5e9f5444cb6aaf3925bf1d8b2dc248f79897f7cc..c1ed012ada2de8eaa125fb6eaad6a5a1bd357b38 100644 (file)
@@ -51,9 +51,11 @@ struct ceph_mon_statfs_request {
 
 struct ceph_mon_client {
        struct ceph_client *client;
-       int last_mon;                       /* last monitor i contacted */
        struct ceph_monmap *monmap;
 
+       int cur_mon;                       /* last monitor i contacted */
+       bool subscribed;
+       unsigned long sub_sent, sub_renew_after;
        struct ceph_connection *con;
 
        /* pending statfs requests */
@@ -61,12 +63,11 @@ struct ceph_mon_client {
        struct radix_tree_root statfs_request_tree;
        int num_statfs_requests;
        u64 last_tid;
-       struct delayed_work statfs_delayed_work;
 
        /* mds/osd map or mount requests */
        struct mutex req_mutex;
-       struct ceph_mon_request mdsreq, osdreq, mountreq;
-       u32 want_mdsmap;
+       struct ceph_mon_request osdreq, mountreq;
+       u32 have_mdsmap;
        u32 want_osdmap;
 
        struct dentry *debugfs_file;
@@ -85,15 +86,12 @@ extern void ceph_monc_stop(struct ceph_mon_client *monc);
  * periodically rerequest the map from the monitor cluster until we
  * get what we want.
  */
-extern void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, u32 want);
 extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
 
 extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want);
 extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
 
 extern void ceph_monc_request_mount(struct ceph_mon_client *monc);
-extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
-extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
 
 extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
                               struct ceph_statfs *buf);
index b9c3948713be8411e00945fbe3ff0a3761d3b3d5..6034029784f79f8f50a1464ce81e0f07ca1ac046 100644 (file)
@@ -252,6 +252,8 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_PING: return "ping";
        case CEPH_MSG_MON_MAP: return "mon_map";
        case CEPH_MSG_MON_GET_MAP: return "mon_get_map";
+       case CEPH_MSG_MON_SUBSCRIBE: return "mon_subscribe";
+       case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
        case CEPH_MSG_CLIENT_MOUNT: return "client_mount";
        case CEPH_MSG_CLIENT_MOUNT_ACK: return "client_mount_ack";
        case CEPH_MSG_STATFS: return "statfs";
index 8a12f9eb275ecbbe052fc9f2947b727ac745b820..cc54237e98cddebf0da4a4b87b6786e3ef9208f9 100644 (file)
@@ -110,7 +110,7 @@ static inline unsigned long time_sub(unsigned long a, unsigned long b)
  * mounting the same ceph filesystem/cluster.
  */
 struct ceph_client {
-       u32 whoami;                   /* my client number */
+       int whoami;                   /* my client number */
        struct dentry *debugfs_fsid, *debugfs_monmap;
        struct dentry *debugfs_mdsmap, *debugfs_osdmap;
        struct dentry *debugfs_dir, *debugfs_dentry_lru;