]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use connection msgr api for mon client
authorSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 18:51:58 +0000 (11:51 -0700)
committerSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 21:33:33 +0000 (14:33 -0700)
src/kernel/messenger.c
src/kernel/mon_client.c
src/kernel/mon_client.h
src/kernel/super.c
src/kernel/super.h

index 1d18d468615591efabcb79001e39a499ba0c9dff..72689adc08e49eed5d1a1d9f822ac87e2c794b6c 100644 (file)
@@ -321,6 +321,7 @@ static void put_connection(struct ceph_connection *con)
 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
                   struct ceph_entity_addr *addr)
 {
+       dout("con_init %p %u.%u.%u.%u:%u\n", con, IPQUADPORT(addr->ipaddr));
        atomic_set(&con->nref, 1);
        con->msgr = msgr;
        INIT_LIST_HEAD(&con->list_all);
index 24ecbd5f4888efa903333ef752363b2e3cfa137c..9d80fbde32d19b63bbb49345aaf88b38ee70b5b8 100644 (file)
@@ -88,18 +88,37 @@ int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
 }
 
 /*
- * Choose a monitor.  If @newmon >= 0, try to choose a different
- * monitor than last time.
+ * Open a session with a (new) monitor.
  */
-static int pick_mon(struct ceph_mon_client *monc, int newmon)
+static int open_session(struct ceph_mon_client *monc, int newmon)
 {
        char r;
 
-       if (!newmon && monc->last_mon >= 0)
-               return monc->last_mon;
+       if (!newmon && monc->con) {
+               dout("open_session mon%d already open\n", monc->last_mon);
+               return 0;
+       }
+
+       if (monc->con) {
+               dout("open_session closing mon%d\n", monc->last_mon);
+               monc->con->put(monc->con);
+       }
+
        get_random_bytes(&r, 1);
        monc->last_mon = r % monc->monmap->num_mon;
-       return monc->last_mon;
+
+       monc->con = kzalloc(sizeof(*monc->con), GFP_NOFS);
+       if (!monc->con) {
+               pr_err("open_session mon%d ENOMEM\n", monc->last_mon);
+               return -ENOMEM;
+       }
+
+       dout("open_session mon%d opened\n", monc->last_mon);
+       ceph_con_init(monc->client->msgr, monc->con,
+                     &monc->monmap->mon_inst[monc->last_mon].addr);
+       monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON);
+       monc->con->peer_name.num = cpu_to_le32(monc->last_mon);
+       return 0;
 }
 
 /*
@@ -157,17 +176,19 @@ static void request_mdsmap(struct ceph_mon_client *monc, int newmon)
 {
        struct ceph_msg *msg;
        struct ceph_mds_getmap *h;
-       int mon = pick_mon(monc, newmon);
+       int err;
 
-       dout("request_mdsmap from mon%d want %u\n", mon, monc->want_mdsmap);
+       dout("request_mdsmap want %u\n", monc->want_mdsmap);
+       err = open_session(monc, newmon);
+       if (err)
+               return;
        msg = ceph_msg_new(CEPH_MSG_MDS_GETMAP, sizeof(*h), 0, 0, NULL);
        if (IS_ERR(msg))
                return;
        h = msg->front.iov_base;
        h->fsid = monc->monmap->fsid;
        h->have_version = cpu_to_le64(monc->want_mdsmap - 1);
-       msg->hdr.dst = monc->monmap->mon_inst[mon];
-       ceph_msg_send(monc->client->msgr, msg, 0);
+       ceph_con_send(monc->con, msg);
 }
 
 /*
@@ -214,9 +235,12 @@ static void request_osdmap(struct ceph_mon_client *monc, int newmon)
 {
        struct ceph_msg *msg;
        struct ceph_osd_getmap *h;
-       int mon = pick_mon(monc, newmon);
+       int err;
 
-       dout("request_osdmap from mon%d want %u\n", mon, monc->want_osdmap);
+       dout("request_osdmap want %u\n", monc->want_osdmap);
+       err = open_session(monc, newmon);
+       if (err)
+               return;
        msg = ceph_msg_new(CEPH_MSG_OSD_GETMAP, sizeof(*h), 0, 0, NULL);
        if (IS_ERR(msg))
                return;
@@ -225,8 +249,7 @@ static void request_osdmap(struct ceph_mon_client *monc, int newmon)
        h->start = cpu_to_le32(monc->want_osdmap);
        h->have_version = cpu_to_le64(monc->want_osdmap ?
                                      monc->want_osdmap-1 : 0);
-       msg->hdr.dst = monc->monmap->mon_inst[mon];
-       ceph_msg_send(monc->client->msgr, msg, 0);
+       ceph_con_send(monc->con, msg);
 }
 
 void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want)
@@ -258,23 +281,140 @@ int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
 }
 
 
+/*
+ * mount
+ */
+static void request_mount(struct ceph_mon_client *monc, int newmon)
+{
+       struct ceph_msg *msg;
+       struct ceph_client_mount *h;
+       int err;
+
+       dout("request_mount\n");
+       err = open_session(monc, newmon);
+       if (err)
+               return;
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL);
+       if (IS_ERR(msg))
+               return;
+       h = msg->front.iov_base;
+       h->have_version = 0;
+       ceph_con_send(monc->con, msg);
+}
+
+void ceph_monc_request_mount(struct ceph_mon_client *monc)
+{
+       mutex_lock(&monc->req_mutex);
+       monc->mountreq.delay = BASE_DELAY_INTERVAL;
+       request_mount(monc, 0);
+       reschedule_timeout(&monc->mountreq);
+       mutex_unlock(&monc->req_mutex);
+}
+
+/*
+ * The monitor responds with mount ack indicate mount success.  The
+ * included client ticket allows the client to talk to MDSs and OSDs.
+ */
+void ceph_handle_mount_ack(struct ceph_client *client, struct ceph_msg *msg)
+{
+       struct ceph_mon_client *monc = &client->monc;
+       struct ceph_monmap *monmap = NULL, *old = client->monc.monmap;
+       void *p, *end;
+       s32 result;
+       u32 len;
+       int err = -EINVAL;
+
+       if (client->signed_ticket) {
+               dout("handle_mount_ack - already mounted\n");
+               return;
+       }
+
+       dout("handle_mount_ack\n");
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       ceph_decode_32_safe(&p, end, result, bad);
+       ceph_decode_32_safe(&p, end, len, bad);
+       if (result) {
+               pr_err("ceph mount denied: %.*s (%d)\n", len, (char *)p,
+                      result);
+               err = result;
+               goto out;
+       }
+       p += len;
+
+       ceph_decode_32_safe(&p, end, len, bad);
+       ceph_decode_need(&p, end, len, bad);
+       monmap = ceph_monmap_decode(p, p + len);
+       if (IS_ERR(monmap)) {
+               pr_err("ceph problem decoding monmap, %d\n",
+                      (int)PTR_ERR(monmap));
+               err = -EINVAL;
+               goto out;
+       }
+       p += len;
+
+       ceph_decode_32_safe(&p, end, len, bad);
+       dout("ticket len %d\n", len);
+       ceph_decode_need(&p, end, len, bad);
+
+       client->signed_ticket = kmalloc(len, GFP_KERNEL);
+       if (!client->signed_ticket) {
+               pr_err("ceph ENOMEM allocating %d bytes for client ticket\n",
+                      len);
+               err = -ENOMEM;
+               goto out_free;
+       }
+
+       memcpy(client->signed_ticket, p, len);
+       client->signed_ticket_len = len;
+
+       client->monc.monmap = monmap;
+       kfree(old);
+
+       client->whoami = le32_to_cpu(msg->hdr.dst.name.num);
+       client->msgr->inst.name = msg->hdr.dst.name;
+       pr_info("ceph mount as client%d fsid is %llx.%llx\n", client->whoami,
+               le64_to_cpu(__ceph_fsid_major(&client->monc.monmap->fsid)),
+               le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid)));
+       ceph_debugfs_client_init(client);
+
+       err = 0;
+       goto out;
+
+bad:
+       pr_err("ceph error decoding mount_ack message\n");
+out_free:
+       kfree(monmap);
+out:
+       client->mount_err = err;
+       mutex_lock(&monc->req_mutex);
+       cancel_timeout(&monc->mountreq);
+       mutex_unlock(&monc->req_mutex);
+       wake_up(&client->mount_wq);
+}
+
+
+
 /*
  * umount
  */
 static void request_umount(struct ceph_mon_client *monc, int newmon)
 {
        struct ceph_msg *msg;
-       int mon = pick_mon(monc, newmon);
        struct ceph_client_mount *h;
+       int err;
 
-       dout("request_umount from mon%d\n", mon);
+       dout("request_umount\n");
+       err = open_session(monc, newmon);
+       if (err)
+               return;
        msg = ceph_msg_new(CEPH_MSG_CLIENT_UNMOUNT, sizeof(*h), 0, 0, NULL);
        if (IS_ERR(msg))
                return;
        h = msg->front.iov_base;
        h->have_version = 0;
-       msg->hdr.dst = monc->monmap->mon_inst[mon];
-       ceph_msg_send(monc->client->msgr, msg, 0);
+       ceph_con_send(monc->con, msg);
 }
 
 void ceph_monc_request_umount(struct ceph_mon_client *monc)
@@ -343,9 +483,12 @@ static int send_statfs(struct ceph_mon_client *monc,
 {
        struct ceph_msg *msg;
        struct ceph_mon_statfs *h;
-       int mon = pick_mon(monc, newmon ? 1 : -1);
+       int err;
 
-       dout("send_statfs to mon%d tid %llu\n", mon, req->tid);
+       dout("send_statfs tid %llu\n", req->tid);
+       err = open_session(monc, newmon);
+       if (err)
+               return err;
        msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
        if (IS_ERR(msg))
                return PTR_ERR(msg);
@@ -354,8 +497,7 @@ static int send_statfs(struct ceph_mon_client *monc,
        h->have_version = 0;
        h->fsid = monc->monmap->fsid;
        h->tid = cpu_to_le64(req->tid);
-       msg->hdr.dst = monc->monmap->mon_inst[mon];
-       ceph_msg_send(monc->client->msgr, msg, 0);
+       ceph_con_send(monc->con, msg);
        return 0;
 }
 
@@ -450,11 +592,7 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        dout("init\n");
        memset(monc, 0, sizeof(*monc));
        monc->client = cl;
-       monc->monmap = kzalloc(sizeof(struct ceph_monmap) +
-              sizeof(struct ceph_entity_addr) * CEPH_MAX_MON_MOUNT_ADDR,
-              GFP_KERNEL);
-       if (monc->monmap == NULL)
-               return -ENOMEM;
+       monc->monmap = NULL;
        mutex_init(&monc->statfs_mutex);
        INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
        monc->num_statfs_requests = 0;
@@ -462,10 +600,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        INIT_DELAYED_WORK(&monc->statfs_delayed_work, do_statfs_check);
        init_request_type(monc, &monc->mdsreq, request_mdsmap);
        init_request_type(monc, &monc->osdreq, request_osdmap);
+       init_request_type(monc, &monc->mountreq, request_mount);
        init_request_type(monc, &monc->umountreq, request_umount);
        mutex_init(&monc->req_mutex);
        monc->want_mdsmap = 0;
        monc->want_osdmap = 0;
+       monc->con = NULL;
        return 0;
 }
 
@@ -474,7 +614,12 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
        dout("stop\n");
        cancel_timeout(&monc->mdsreq);
        cancel_timeout(&monc->osdreq);
+       cancel_timeout(&monc->mountreq);
        cancel_timeout(&monc->umountreq);
        cancel_delayed_work_sync(&monc->statfs_delayed_work);
+       if (monc->con) {
+               monc->con->put(monc->con);
+               monc->con = NULL;
+       }
        kfree(monc->monmap);
 }
index f7237c2fcd0ebed092e3e6224abf3048aafb2c8f..3685a031c8a02e10c63ce25d655a4b56a0384d94 100644 (file)
@@ -54,6 +54,8 @@ struct ceph_mon_client {
        int last_mon;                       /* last monitor i contacted */
        struct ceph_monmap *monmap;
 
+       struct ceph_connection *con;
+
        /* pending statfs requests */
        struct mutex statfs_mutex;
        struct radix_tree_root statfs_request_tree;
@@ -63,7 +65,7 @@ struct ceph_mon_client {
 
        /* mds/osd map or umount requests */
        struct mutex req_mutex;
-       struct ceph_mon_request mdsreq, osdreq, umountreq;
+       struct ceph_mon_request mdsreq, osdreq, mountreq, umountreq;
        u32 want_mdsmap;
        u32 want_osdmap;
 
@@ -89,6 +91,9 @@ extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
 extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want);
 extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
 
+extern void ceph_monc_request_mount(struct ceph_mon_client *monc);
+extern void ceph_handle_mount_ack(struct ceph_client *client,
+                                 struct ceph_msg *msg);
 extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
 
 extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
index 590751e94f2c0081799bd7ca735fb53340920fa8..f4a45c8371408e270b5e3aa6b4173f9e5cb538ca 100644 (file)
@@ -264,80 +264,6 @@ static const struct super_operations ceph_super_ops = {
 };
 
 
-
-/*
- * The monitor responds with mount ack indicate mount success.  The
- * included client ticket allows the client to talk to MDSs and OSDs.
- */
-static int handle_mount_ack(struct ceph_client *client, struct ceph_msg *msg)
-{
-       struct ceph_monmap *monmap = NULL, *old = client->monc.monmap;
-       void *p, *end;
-       s32 result;
-       u32 len;
-       int err = -EINVAL;
-
-       if (client->signed_ticket) {
-               dout("handle_mount_ack - already mounted\n");
-               return 0;
-       }
-
-       dout("handle_mount_ack\n");
-       p = msg->front.iov_base;
-       end = p + msg->front.iov_len;
-
-       ceph_decode_32_safe(&p, end, result, bad);
-       ceph_decode_32_safe(&p, end, len, bad);
-       if (result) {
-               pr_err("ceph mount denied: %.*s (%d)\n", len, (char *)p,
-                      result);
-               return result;
-       }
-       p += len;
-
-       ceph_decode_32_safe(&p, end, len, bad);
-       ceph_decode_need(&p, end, len, bad);
-       monmap = ceph_monmap_decode(p, p + len);
-       if (IS_ERR(monmap)) {
-               pr_err("ceph problem decoding monmap, %d\n",
-                      (int)PTR_ERR(monmap));
-               return -EINVAL;
-       }
-       p += len;
-
-       ceph_decode_32_safe(&p, end, len, bad);
-       dout("ticket len %d\n", len);
-       ceph_decode_need(&p, end, len, bad);
-
-       client->signed_ticket = kmalloc(len, GFP_KERNEL);
-       if (!client->signed_ticket) {
-               pr_err("ceph ENOMEM allocating %d bytes for client ticket\n",
-                      len);
-               err = -ENOMEM;
-               goto out;
-       }
-
-       memcpy(client->signed_ticket, p, len);
-       client->signed_ticket_len = len;
-
-       client->monc.monmap = monmap;
-       kfree(old);
-
-       client->whoami = le32_to_cpu(msg->hdr.dst.name.num);
-       client->msgr->inst.name = msg->hdr.dst.name;
-       pr_info("ceph mount as client%d fsid is %llx.%llx\n", client->whoami,
-               le64_to_cpu(__ceph_fsid_major(&client->monc.monmap->fsid)),
-               le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid)));
-       ceph_debugfs_client_init(client);
-       return 0;
-
-bad:
-       pr_err("ceph error decoding mount_ack message\n");
-out:
-       kfree(monmap);
-       return err;
-}
-
 const char *ceph_msg_type_name(int type)
 {
        switch (type) {
@@ -507,12 +433,16 @@ bad:
        return -EINVAL;
 }
 
-static int parse_mount_args(int flags, char *options, const char *dev_name,
-                           struct ceph_mount_args *args, const char **path)
+static int parse_mount_args(struct ceph_client *client,
+                           int flags, char *options, const char *dev_name,
+                           const char **path)
 {
+       struct ceph_mount_args *args = &client->mount_args;
        const char *c;
        int err;
        substring_t argstr[MAX_OPT_ARGS];
+       int num_mon;
+       struct ceph_entity_addr mon_addr[CEPH_MAX_MON_MOUNT_ADDR];
        int i;
 
        dout("parse_mount_args dev_name '%s'\n", dev_name);
@@ -540,16 +470,28 @@ static int parse_mount_args(int flags, char *options, const char *dev_name,
        }
 
        /* get mon ip(s) */
-       err = parse_ips(dev_name, *path, args->mon_addr,
-                       CEPH_MAX_MON_MOUNT_ADDR, &args->num_mon);
+       err = parse_ips(dev_name, *path, mon_addr,
+                       CEPH_MAX_MON_MOUNT_ADDR, &num_mon);
        if (err < 0)
                return err;
 
-       for (i = 0; i < args->num_mon; i++) {
-               args->mon_addr[i].ipaddr.sin_family = AF_INET;
-               args->mon_addr[i].erank = 0;
-               args->mon_addr[i].nonce = 0;
+       /* build initial monmap */
+       client->monc.monmap = kzalloc(sizeof(*client->monc.monmap) + 
+                              num_mon*sizeof(client->monc.monmap->mon_inst[0]),
+                              GFP_KERNEL);
+       if (!client->monc.monmap)
+               return -ENOMEM;
+       for (i = 0; i < num_mon; i++) {
+               client->monc.monmap->mon_inst[i].addr = mon_addr[i];
+               client->monc.monmap->mon_inst[i].addr.ipaddr.sin_family =
+                       AF_INET;
+               client->monc.monmap->mon_inst[i].addr.erank = 0;
+               client->monc.monmap->mon_inst[i].addr.nonce = 0;
+               client->monc.monmap->mon_inst[i].name.type =
+                       cpu_to_le32(CEPH_ENTITY_TYPE_MON);
+               client->monc.monmap->mon_inst[i].name.num = cpu_to_le32(i);
        }
+       client->monc.monmap->num_mon = num_mon;
        args->my_addr.ipaddr.sin_family = AF_INET;
        args->my_addr.ipaddr.sin_addr.s_addr = htonl(0);
        args->my_addr.ipaddr.sin_port = htons(0);
@@ -822,15 +764,11 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                      const char *path)
 {
        struct ceph_entity_addr *myaddr = NULL;
-       struct ceph_msg *mount_msg;
        int err;
        int request_interval = 5 * HZ;
        unsigned long timeout = client->mount_args.mount_timeout * HZ;
        unsigned long started = jiffies;  /* note the start time */
-       int which;
        struct dentry *root;
-       unsigned char r;
-       struct ceph_client_mount *h;
 
        dout("mount start\n");
        mutex_lock(&client->mount_mutex);
@@ -854,30 +792,14 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
        }
 
        /* send mount request, and wait for mon, mds, and osd maps */
+       ceph_monc_request_mount(&client->monc);
        while (!have_mon_map(client) && !client->mount_err) {
                err = -EIO;
                if (timeout && time_after_eq(jiffies, started + timeout))
                        goto out;
-               dout("mount sending mount request\n");
-               get_random_bytes(&r, 1);
-               which = r % client->mount_args.num_mon;
-               mount_msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0,
-                                        0, NULL);
-               if (IS_ERR(mount_msg)) {
-                       err = PTR_ERR(mount_msg);
-                       goto out;
-               }
-               h = mount_msg->front.iov_base;
-               h->have_version = 0;
-               mount_msg->hdr.dst.name.type =
-                       cpu_to_le32(CEPH_ENTITY_TYPE_MON);
-               mount_msg->hdr.dst.name.num = cpu_to_le32(which);
-               mount_msg->hdr.dst.addr = client->mount_args.mon_addr[which];
-
-               ceph_msg_send(client->msgr, mount_msg, 0);
 
                /* wait */
-               dout("mount sent to mon%d, waiting for mon map\n", which);
+               dout("mount waiting for niynt\n");
                err = wait_event_interruptible_timeout(client->mount_wq,
                               client->mount_err || have_mon_map(client),
                               request_interval);
@@ -1025,8 +947,7 @@ void ceph_dispatch(void *p, struct ceph_msg *msg)
 
        switch (type) {
        case CEPH_MSG_CLIENT_MOUNT_ACK:
-               client->mount_err = handle_mount_ack(client, msg);
-               wake_up(&client->mount_wq);
+               ceph_handle_mount_ack(client, msg);
                break;
 
                /* mon client */
@@ -1125,11 +1046,11 @@ static int ceph_compare_super(struct super_block *sb, void *data)
                }
        } else {
                /* do we share (a) monitor? */
-               for (i = 0; i < args->num_mon; i++)
+               for (i = 0; i < new->monc.monmap->num_mon; i++)
                        if (ceph_monmap_contains(other->monc.monmap,
-                                                &args->mon_addr[i]))
+                                        &new->monc.monmap->mon_inst[i].addr))
                                break;
-               if (i == args->num_mon) {
+               if (i == new->monc.monmap->num_mon) {
                        dout("mon ip not part of monmap\n");
                        return 0;
                }
@@ -1187,8 +1108,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
        if (IS_ERR(client))
                return PTR_ERR(client);
 
-       err = parse_mount_args(flags, data, dev_name,
-                              &client->mount_args, &path);
+       err = parse_mount_args(client, flags, data, dev_name, &path);
        if (err < 0)
                goto out;
 
index 561adf7e83b52dfef59e5143cb7fc11a15446134..5d5c433fd268631f24d977f39155096a222bd6dc 100644 (file)
@@ -55,8 +55,6 @@ struct ceph_mount_args {
        int caps_wanted_delay_min, caps_wanted_delay_max;
        ceph_fsid_t fsid;
        struct ceph_entity_addr my_addr;
-       int num_mon;
-       struct ceph_entity_addr mon_addr[CEPH_MAX_MON_MOUNT_ADDR];
        int wsize;
        int rsize;            /* max readahead */
        int max_readdir;      /* max readdir size */