]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use subscribe to get mdsmap, osdmap updates
authorSage Weil <sage@newdream.net>
Tue, 1 Sep 2009 18:04:03 +0000 (11:04 -0700)
committerSage Weil <sage@newdream.net>
Tue, 1 Sep 2009 18:04:03 +0000 (11:04 -0700)
src/kernel/debugfs.c
src/kernel/mon_client.c
src/kernel/mon_client.h
src/kernel/osd_client.c
src/kernel/super.c

index 9ee53bfdcf96057f3f2a8d2e82c9754d4f0d6c72..5de58fef5d676609a7cb72754bdf86482b968f6a 100644 (file)
@@ -122,12 +122,14 @@ static int monc_show(struct seq_file *s, void *p)
        int got;
        struct ceph_mon_client *monc = &client->monc;
 
-       mutex_lock(&monc->statfs_mutex);
+       mutex_lock(&monc->mutex);
 
-       if (monc->want_osdmap)
-               seq_printf(s, "want osdmap %u\n", (unsigned)monc->want_osdmap);
        if (monc->have_mdsmap)
                seq_printf(s, "have mdsmap %u\n", (unsigned)monc->have_mdsmap);
+       if (monc->have_osdmap)
+               seq_printf(s, "have osdmap %u\n", (unsigned)monc->have_osdmap);
+       if (monc->want_next_osdmap)
+               seq_printf(s, "want next osdmap\n");
 
        while (nexttid < monc->last_tid) {
                got = radix_tree_gang_lookup(&monc->statfs_request_tree,
@@ -138,7 +140,7 @@ static int monc_show(struct seq_file *s, void *p)
 
                seq_printf(s, "%lld statfs\n", req->tid);
        }
-       mutex_unlock(&monc->statfs_mutex);
+       mutex_unlock(&monc->mutex);
 
        return 0;
 }
index aaae7cdf6c62b9a61f5ab6655db0c8f90dbe7918..f86c85412ab9f9947580c1a83e07238aa04ebe0d 100644 (file)
@@ -92,7 +92,7 @@ int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
 /*
  * Open a session with a (new) monitor.
  */
-static int open_session(struct ceph_mon_client *monc, int newmon)
+static int __open_session(struct ceph_mon_client *monc, int newmon)
 {
        char r;
 
@@ -126,9 +126,29 @@ static int open_session(struct ceph_mon_client *monc, int newmon)
        return 0;
 }
 
-static void send_subscribe(struct ceph_mon_client *monc)
+/*
+ * Reschedule delayed work timer.
+ */
+static void __schedule_delayed(struct ceph_mon_client *monc)
+{
+       unsigned delay;
+
+       if (monc->want_mount)
+               delay = 10 * HZ;
+       else
+               delay = 20 * HZ;
+       dout("__schedule_delayed after %u\n", delay);
+       schedule_delayed_work(&monc->delayed_work, delay);
+}
+
+/*
+ * Send subscribe request for mdsmap and/or osdmap.
+ */
+static void __send_subscribe(struct ceph_mon_client *monc)
 {
-       if (!monc->subscribed && !monc->sub_sent) {
+       if ((!monc->subscribed && !monc->sub_sent) ||
+           time_before(jiffies, monc->sub_renew_after) ||
+           !monc->want_next_osdmap) {
                struct ceph_msg *msg;
                struct ceph_mon_subscribe_item *i;
                void *p, *end;
@@ -142,12 +162,19 @@ static void send_subscribe(struct ceph_mon_client *monc)
                p = msg->front.iov_base;
                end = p + msg->front.iov_len;
 
-               ceph_encode_32(&p, 1);
+               ceph_encode_32(&p, 1 + monc->want_next_osdmap);
                ceph_encode_string(&p, end, "mdsmap", 6);
                i = p;
                i->have = cpu_to_le64(monc->have_mdsmap);
                i->onetime = 0;
                p += sizeof(*i);
+               if (monc->want_next_osdmap) {
+                       ceph_encode_string(&p, end, "osdmap", 6);
+                       i = p;
+                       i->have = cpu_to_le64(monc->have_osdmap);
+                       i->onetime = 1;
+                       p += sizeof(*i);
+               }
 
                msg->front.iov_len = p - msg->front.iov_base;
                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -160,143 +187,63 @@ static void send_subscribe(struct ceph_mon_client *monc)
 static void handle_subscribe_ack(struct ceph_mon_client *monc,
                                 struct ceph_msg *msg)
 {
-       unsigned ms;
+       unsigned seconds;
        void *p = msg->front.iov_base;
        void *end = p + msg->front.iov_len;
 
        dout("handle_subscribe_ack\n");
-       ceph_decode_64_safe(&p, end, ms, bad);
+       ceph_decode_64_safe(&p, end, seconds, bad);
        monc->subscribed = true;
-       monc->sub_renew_after = monc->sub_sent + ms / 1000;
+       monc->sub_renew_after = monc->sub_sent + seconds*HZ;
        
 bad:
        pr_err("ceph got corrupt subscribe-ack msg\n");
 }
 
 /*
- * Generic timeout mechanism for monitor requests, so we can resend if
- * we don't get a timely reply.  Exponential backoff.
- */
-static void reschedule_timeout(struct ceph_mon_request *req)
-{
-       schedule_delayed_work(&req->delayed_work, req->delay);
-       if (req->delay < MAX_DELAY_INTERVAL)
-               req->delay *= 2;
-       else
-               req->delay = MAX_DELAY_INTERVAL;
-}
-
-static void retry_request(struct work_struct *work)
-{
-       struct ceph_mon_request *req =
-               container_of(work, struct ceph_mon_request,
-                            delayed_work.work);
-
-       /*
-        * if lock is contended, reschedule sooner.  we can't wait for
-        * mutex because we cancel the timeout sync with lock held.
-        */
-       if (mutex_trylock(&req->monc->req_mutex)) {
-               req->do_request(req->monc, 1);
-               reschedule_timeout(req);
-               mutex_unlock(&req->monc->req_mutex);
-       } else
-               schedule_delayed_work(&req->delayed_work, BASE_DELAY_INTERVAL);
-}
-
-static void cancel_timeout(struct ceph_mon_request *req)
-{
-       cancel_delayed_work_sync(&req->delayed_work);
-       req->delay = BASE_DELAY_INTERVAL;
-}
-
-static void init_request_type(struct ceph_mon_client *monc,
-                             struct ceph_mon_request *req,
-                             ceph_monc_request_func_t func)
-{
-       req->monc = monc;
-       INIT_DELAYED_WORK(&req->delayed_work, retry_request);
-       req->delay = 0;
-       req->do_request = func;
-}
-
-
-/*
- * Possibly cancel our desire for a new map
+ * Keep track of which maps we have
  */
 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
 {
-       mutex_lock(&monc->req_mutex);
+       mutex_lock(&monc->mutex);
        monc->have_mdsmap = got;
-       mutex_unlock(&monc->req_mutex);
+       mutex_unlock(&monc->mutex);
        return 0;
 }
 
-
-/*
- * osd map
- */
-static void request_osdmap(struct ceph_mon_client *monc, int newmon)
-{
-       struct ceph_msg *msg;
-       struct ceph_osd_getmap *h;
-       int err;
-
-       dout("request_osdmap want %u\n", monc->want_osdmap);
-       err = open_session(monc, newmon);
-       if (err)
-               return;
-       msg = ceph_msg_new(CEPH_MSG_OSD_GETMAP, sizeof(*h), 0, 0, NULL);
-       if (IS_ERR(msg))
-               return;
-       h = msg->front.iov_base;
-       h->fsid = monc->monmap->fsid;
-       h->start = cpu_to_le32(monc->want_osdmap);
-       h->have_version = cpu_to_le64(monc->want_osdmap ?
-                                     monc->want_osdmap-1 : 0);
-       ceph_con_send(monc->con, msg);
-}
-
-void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want)
+int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
 {
-       dout("request_osdmap want %u\n", want);
-       mutex_lock(&monc->req_mutex);
-       monc->osdreq.delay = BASE_DELAY_INTERVAL;
-       monc->want_osdmap = want;
-       request_osdmap(monc, 0);
-       reschedule_timeout(&monc->osdreq);
-       mutex_unlock(&monc->req_mutex);
+       mutex_lock(&monc->mutex);
+       monc->have_osdmap = got;
+       monc->want_next_osdmap = false;
+       mutex_unlock(&monc->mutex);
+       return 0;
 }
 
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+/*
+ * Register interest in the next osdmap
+ */
+void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 {
-       int ret = 0;
-
-       mutex_lock(&monc->req_mutex);
-       if (got < monc->want_osdmap) {
-               dout("got_osdmap %u < wanted %u\n", got, monc->want_osdmap);
-               ret = -EAGAIN;
-       } else {
-               dout("got_osdmap %u >= wanted %u\n", got, monc->want_osdmap);
-               monc->want_osdmap = 0;
-               cancel_timeout(&monc->osdreq);
-       }
-       mutex_unlock(&monc->req_mutex);
-       return ret;
+       dout("request_next_osdmap have %u\n", monc->have_osdmap);
+       mutex_lock(&monc->mutex);
+       monc->want_next_osdmap = true;
+       __send_subscribe(monc);
+       mutex_unlock(&monc->mutex);
 }
 
 
 /*
  * mount
  */
-static void request_mount(struct ceph_mon_client *monc, int newmon)
+static void __request_mount(struct ceph_mon_client *monc)
 {
        struct ceph_msg *msg;
        struct ceph_client_mount *h;
        int err;
 
-       dout("request_mount\n");
-       err = open_session(monc, newmon);
+       dout("__request_mount\n");
+       err = __open_session(monc, 1);
        if (err)
                return;
        msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL);
@@ -309,11 +256,10 @@ static void request_mount(struct ceph_mon_client *monc, int newmon)
 
 void ceph_monc_request_mount(struct ceph_mon_client *monc)
 {
-       mutex_lock(&monc->req_mutex);
-       monc->mountreq.delay = BASE_DELAY_INTERVAL;
-       request_mount(monc, 0);
-       reschedule_timeout(&monc->mountreq);
-       mutex_unlock(&monc->req_mutex);
+       mutex_lock(&monc->mutex);
+       __request_mount(monc);
+       __schedule_delayed(monc);
+       mutex_unlock(&monc->mutex);
 }
 
 /*
@@ -336,6 +282,8 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
                return;
        }
 
+       mutex_lock(&monc->mutex);
+
        dout("handle_mount_ack\n");
        p = msg->front.iov_base;
        end = p + msg->front.iov_len;
@@ -371,19 +319,19 @@ static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
        client->signed_ticket = NULL;
        client->signed_ticket_len = 0;
 
+       monc->want_mount = false;
+
        client->whoami = cnum;
        client->msgr->inst.name.num = cpu_to_le32(cnum);
        client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
-
        memcpy(&client->msgr->inst.addr, &addr, sizeof(addr));
-
        pr_info("ceph client%d %u.%u.%u.%u:%u fsid %llx.%llx\n", client->whoami,
                IPQUADPORT(addr.ipaddr),
                le64_to_cpu(__ceph_fsid_major(&client->monc.monmap->fsid)),
                le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid)));
-       ceph_debugfs_client_init(client);
 
-       send_subscribe(monc);
+       ceph_debugfs_client_init(client);
+       __send_subscribe(monc);
 
        err = 0;
        goto out;
@@ -392,9 +340,7 @@ bad:
        pr_err("ceph error decoding mount_ack message\n");
 out:
        client->mount_err = err;
-       mutex_lock(&monc->req_mutex);
-       cancel_timeout(&monc->mountreq);
-       mutex_unlock(&monc->req_mutex);
+       mutex_unlock(&monc->mutex);
        wake_up(&client->mount_wq);
 }
 
@@ -416,13 +362,13 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
        tid = le64_to_cpu(reply->tid);
        dout("handle_statfs_reply %p tid %llu\n", msg, tid);
 
-       mutex_lock(&monc->statfs_mutex);
+       mutex_lock(&monc->mutex);
        req = radix_tree_lookup(&monc->statfs_request_tree, tid);
        if (req) {
                *req->buf = reply->st;
                req->result = 0;
        }
-       mutex_unlock(&monc->statfs_mutex);
+       mutex_unlock(&monc->mutex);
        if (req)
                complete(&req->completion);
        return;
@@ -442,7 +388,7 @@ static int send_statfs(struct ceph_mon_client *monc,
        int err;
 
        dout("send_statfs tid %llu\n", req->tid);
-       err = open_session(monc, 0);
+       err = __open_session(monc, 0);
        if (err)
                return err;
        msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
@@ -469,29 +415,29 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
        init_completion(&req.completion);
 
        /* register request */
-       mutex_lock(&monc->statfs_mutex);
+       mutex_lock(&monc->mutex);
        req.tid = ++monc->last_tid;
        req.last_attempt = jiffies;
        req.delay = BASE_DELAY_INTERVAL;
        if (radix_tree_insert(&monc->statfs_request_tree, req.tid, &req) < 0) {
-               mutex_unlock(&monc->statfs_mutex);
+               mutex_unlock(&monc->mutex);
                pr_err("ceph ENOMEM in do_statfs\n");
                return -ENOMEM;
        }
        monc->num_statfs_requests++;
        ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, 1);
-       mutex_unlock(&monc->statfs_mutex);
+       mutex_unlock(&monc->mutex);
 
        /* send request and wait */
        err = send_statfs(monc, &req);
        if (!err)
                err = wait_for_completion_interruptible(&req.completion);
 
-       mutex_lock(&monc->statfs_mutex);
+       mutex_lock(&monc->mutex);
        radix_tree_delete(&monc->statfs_request_tree, req.tid);
        monc->num_statfs_requests--;
        ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, -1);
-       mutex_unlock(&monc->statfs_mutex);
+       mutex_unlock(&monc->mutex);
 
        if (!err)
                err = req.result;
@@ -508,7 +454,7 @@ static void resend_statfs(struct ceph_mon_client *monc)
        int did = 0;
        struct ceph_mon_statfs_request *req;
 
-       mutex_lock(&monc->statfs_mutex);
+       mutex_lock(&monc->mutex);
        while (1) {
                got = radix_tree_gang_lookup(&monc->statfs_request_tree,
                                             (void **)&req,
@@ -520,9 +466,30 @@ static void resend_statfs(struct ceph_mon_client *monc)
 
                send_statfs(monc, req);
        }
-       mutex_unlock(&monc->statfs_mutex);
+       mutex_unlock(&monc->mutex);
 }
 
+/*
+ * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
+ * renew/retry subscription as needed (in case it is timing out, or we
+ * got an ENOMEM).  And keep the monitor connection alive.
+ */
+static void delayed_work(struct work_struct *work)
+{
+       struct ceph_mon_client *monc =
+               container_of(work, struct ceph_mon_client, delayed_work.work);
+
+       dout("monc delayed_work\n");
+       mutex_lock(&monc->mutex);
+       if (monc->want_mount) {
+               __request_mount(monc);
+       } else {
+               ceph_con_keepalive(monc->con);
+               __send_subscribe(monc);
+       }
+       __schedule_delayed(monc);
+       mutex_unlock(&monc->mutex);
+}
 
 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 {
@@ -530,24 +497,25 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        memset(monc, 0, sizeof(*monc));
        monc->client = cl;
        monc->monmap = NULL;
-       mutex_init(&monc->statfs_mutex);
+       mutex_init(&monc->mutex);
+       monc->con = NULL;
+
+       INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
        INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
        monc->num_statfs_requests = 0;
        monc->last_tid = 0;
-       init_request_type(monc, &monc->osdreq, request_osdmap);
-       init_request_type(monc, &monc->mountreq, request_mount);
-       mutex_init(&monc->req_mutex);
+
        monc->have_mdsmap = 0;
-       monc->want_osdmap = 0;
-       monc->con = NULL;
+       monc->have_osdmap = 0;
+       monc->want_next_osdmap = true;
+       monc->want_mount = true;
        return 0;
 }
 
 void ceph_monc_stop(struct ceph_mon_client *monc)
 {
        dout("stop\n");
-       cancel_timeout(&monc->osdreq);
-       cancel_timeout(&monc->mountreq);
+       cancel_delayed_work_sync(&monc->delayed_work);
        if (monc->con) {
                monc->con->ops->put(monc->con);
                monc->con = NULL;
@@ -601,9 +569,10 @@ static void mon_reset(struct ceph_connection *con)
        struct ceph_mon_client *monc = con->private;
 
        dout("mon_reset\n");
-       if (open_session(monc, 1) < 0)
+       if (__open_session(monc, 1) < 0)
                return;   /* delayed work handler will retry */
 
+       __send_subscribe(monc);
        resend_statfs(monc);
 }
 
index c1ed012ada2de8eaa125fb6eaad6a5a1bd357b38..a49a5d59ef0bf9a8e95a729d070509808ba23b97 100644 (file)
@@ -53,22 +53,22 @@ struct ceph_mon_client {
        struct ceph_client *client;
        struct ceph_monmap *monmap;
 
+       struct mutex mutex;
+       struct delayed_work delayed_work;
+
        int cur_mon;                       /* last monitor i contacted */
        bool subscribed;
        unsigned long sub_sent, sub_renew_after;
        struct ceph_connection *con;
 
        /* pending statfs requests */
-       struct mutex statfs_mutex;
        struct radix_tree_root statfs_request_tree;
        int num_statfs_requests;
        u64 last_tid;
 
        /* mds/osd map or mount requests */
-       struct mutex req_mutex;
-       struct ceph_mon_request osdreq, mountreq;
-       u32 have_mdsmap;
-       u32 want_osdmap;
+       bool want_mount, want_next_osdmap;
+       u32 have_osdmap, have_mdsmap;
 
        struct dentry *debugfs_file;
 };
@@ -87,10 +87,10 @@ extern void ceph_monc_stop(struct ceph_mon_client *monc);
  * get what we want.
  */
 extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
-
-extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want);
 extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
 
+extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
+
 extern void ceph_monc_request_mount(struct ceph_mon_client *monc);
 
 extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
index 12e0d8de46a0819b4f2f3eae14fad47d939b3a76..cb77848c7064ce3df8c0c1133d2b3e06218d7b68 100644 (file)
@@ -523,8 +523,7 @@ static int send_request(struct ceph_osd_client *osdc,
                return err;
        if (req->r_osd == NULL) {
                dout("send_request %p no up osds in pg\n", req);
-               ceph_monc_request_osdmap(&osdc->client->monc,
-                                        osdc->osdmap->epoch+1);
+               ceph_monc_request_next_osdmap(&osdc->client->monc);
                return 0;
        }
 
@@ -565,7 +564,7 @@ static void handle_timeout(struct work_struct *work)
        dout("timeout\n");
        down_read(&osdc->map_sem);
 
-       ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
+       ceph_monc_request_next_osdmap(&osdc->client->monc);
 
        mutex_lock(&osdc->request_mutex);
        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
@@ -770,8 +769,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
-               ceph_monc_request_osdmap(&osdc->client->monc,
-                                        osdc->osdmap->epoch+1);
+               ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
 }
 
index 6034029784f79f8f50a1464ce81e0f07ca1ac046..852e67c108f5fe17492b23eabc9da2b7b72bec48 100644 (file)
@@ -727,7 +727,6 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
 {
        struct ceph_entity_addr *myaddr = NULL;
        int err;
-       int request_interval = 5 * HZ;
        unsigned long timeout = client->mount_args.mount_timeout * HZ;
        unsigned long started = jiffies;  /* note the start time */
        struct dentry *root;
@@ -756,10 +755,10 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                        goto out;
 
                /* wait */
-               dout("mount waiting for niynt\n");
+               dout("mount waiting for mount\n");
                err = wait_event_interruptible_timeout(client->mount_wq,
                               client->mount_err || have_mon_map(client),
-                              request_interval);
+                              timeout);
                if (err == -EINTR || err == -ERESTARTSYS)
                        goto out;
                if (client->mount_err) {
@@ -768,7 +767,6 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                }
        }
 
-
        dout("mount opening root\n");
        root = open_root_dentry(client, "", started);
        if (IS_ERR(root)) {