From: Sage Weil Date: Wed, 26 Aug 2009 18:51:58 +0000 (-0700) Subject: kclient: use connection msgr api for mon client X-Git-Tag: v0.14~115 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=60a3ed00a7b70948427a0c95a18e1771f54b8119;p=ceph.git kclient: use connection msgr api for mon client --- diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 1d18d4686155..72689adc08e4 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -321,6 +321,7 @@ static void put_connection(struct ceph_connection *con) void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con, struct ceph_entity_addr *addr) { + dout("con_init %p %u.%u.%u.%u:%u\n", con, IPQUADPORT(addr->ipaddr)); atomic_set(&con->nref, 1); con->msgr = msgr; INIT_LIST_HEAD(&con->list_all); diff --git a/src/kernel/mon_client.c b/src/kernel/mon_client.c index 24ecbd5f4888..9d80fbde32d1 100644 --- a/src/kernel/mon_client.c +++ b/src/kernel/mon_client.c @@ -88,18 +88,37 @@ int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) } /* - * Choose a monitor. If @newmon >= 0, try to choose a different - * monitor than last time. + * Open a session with a (new) monitor. */ -static int pick_mon(struct ceph_mon_client *monc, int newmon) +static int open_session(struct ceph_mon_client *monc, int newmon) { char r; - if (!newmon && monc->last_mon >= 0) - return monc->last_mon; + if (!newmon && monc->con) { + dout("open_session mon%d already open\n", monc->last_mon); + return 0; + } + + if (monc->con) { + dout("open_session closing mon%d\n", monc->last_mon); + monc->con->put(monc->con); + } + get_random_bytes(&r, 1); monc->last_mon = r % monc->monmap->num_mon; - return monc->last_mon; + + monc->con = kzalloc(sizeof(*monc->con), GFP_NOFS); + if (!monc->con) { + pr_err("open_session mon%d ENOMEM\n", monc->last_mon); + return -ENOMEM; + } + + dout("open_session mon%d opened\n", monc->last_mon); + ceph_con_init(monc->client->msgr, monc->con, + &monc->monmap->mon_inst[monc->last_mon].addr); + monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON); + monc->con->peer_name.num = cpu_to_le32(monc->last_mon); + return 0; } /* @@ -157,17 +176,19 @@ static void request_mdsmap(struct ceph_mon_client *monc, int newmon) { struct ceph_msg *msg; struct ceph_mds_getmap *h; - int mon = pick_mon(monc, newmon); + int err; - dout("request_mdsmap from mon%d want %u\n", mon, monc->want_mdsmap); + dout("request_mdsmap want %u\n", monc->want_mdsmap); + err = open_session(monc, newmon); + if (err) + return; msg = ceph_msg_new(CEPH_MSG_MDS_GETMAP, sizeof(*h), 0, 0, NULL); if (IS_ERR(msg)) return; h = msg->front.iov_base; h->fsid = monc->monmap->fsid; h->have_version = cpu_to_le64(monc->want_mdsmap - 1); - msg->hdr.dst = monc->monmap->mon_inst[mon]; - ceph_msg_send(monc->client->msgr, msg, 0); + ceph_con_send(monc->con, msg); } /* @@ -214,9 +235,12 @@ static void request_osdmap(struct ceph_mon_client *monc, int newmon) { struct ceph_msg *msg; struct ceph_osd_getmap *h; - int mon = pick_mon(monc, newmon); + int err; - dout("request_osdmap from mon%d want %u\n", mon, monc->want_osdmap); + dout("request_osdmap want %u\n", monc->want_osdmap); + err = open_session(monc, newmon); + if (err) + return; msg = ceph_msg_new(CEPH_MSG_OSD_GETMAP, sizeof(*h), 0, 0, NULL); if (IS_ERR(msg)) return; @@ -225,8 +249,7 @@ static void request_osdmap(struct ceph_mon_client *monc, int newmon) h->start = cpu_to_le32(monc->want_osdmap); h->have_version = cpu_to_le64(monc->want_osdmap ? monc->want_osdmap-1 : 0); - msg->hdr.dst = monc->monmap->mon_inst[mon]; - ceph_msg_send(monc->client->msgr, msg, 0); + ceph_con_send(monc->con, msg); } void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want) @@ -258,23 +281,140 @@ int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) } +/* + * mount + */ +static void request_mount(struct ceph_mon_client *monc, int newmon) +{ + struct ceph_msg *msg; + struct ceph_client_mount *h; + int err; + + dout("request_mount\n"); + err = open_session(monc, newmon); + if (err) + return; + msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL); + if (IS_ERR(msg)) + return; + h = msg->front.iov_base; + h->have_version = 0; + ceph_con_send(monc->con, msg); +} + +void ceph_monc_request_mount(struct ceph_mon_client *monc) +{ + mutex_lock(&monc->req_mutex); + monc->mountreq.delay = BASE_DELAY_INTERVAL; + request_mount(monc, 0); + reschedule_timeout(&monc->mountreq); + mutex_unlock(&monc->req_mutex); +} + +/* + * The monitor responds with mount ack indicate mount success. The + * included client ticket allows the client to talk to MDSs and OSDs. + */ +void ceph_handle_mount_ack(struct ceph_client *client, struct ceph_msg *msg) +{ + struct ceph_mon_client *monc = &client->monc; + struct ceph_monmap *monmap = NULL, *old = client->monc.monmap; + void *p, *end; + s32 result; + u32 len; + int err = -EINVAL; + + if (client->signed_ticket) { + dout("handle_mount_ack - already mounted\n"); + return; + } + + dout("handle_mount_ack\n"); + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + ceph_decode_32_safe(&p, end, result, bad); + ceph_decode_32_safe(&p, end, len, bad); + if (result) { + pr_err("ceph mount denied: %.*s (%d)\n", len, (char *)p, + result); + err = result; + goto out; + } + p += len; + + ceph_decode_32_safe(&p, end, len, bad); + ceph_decode_need(&p, end, len, bad); + monmap = ceph_monmap_decode(p, p + len); + if (IS_ERR(monmap)) { + pr_err("ceph problem decoding monmap, %d\n", + (int)PTR_ERR(monmap)); + err = -EINVAL; + goto out; + } + p += len; + + ceph_decode_32_safe(&p, end, len, bad); + dout("ticket len %d\n", len); + ceph_decode_need(&p, end, len, bad); + + client->signed_ticket = kmalloc(len, GFP_KERNEL); + if (!client->signed_ticket) { + pr_err("ceph ENOMEM allocating %d bytes for client ticket\n", + len); + err = -ENOMEM; + goto out_free; + } + + memcpy(client->signed_ticket, p, len); + client->signed_ticket_len = len; + + client->monc.monmap = monmap; + kfree(old); + + client->whoami = le32_to_cpu(msg->hdr.dst.name.num); + client->msgr->inst.name = msg->hdr.dst.name; + pr_info("ceph mount as client%d fsid is %llx.%llx\n", client->whoami, + le64_to_cpu(__ceph_fsid_major(&client->monc.monmap->fsid)), + le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid))); + ceph_debugfs_client_init(client); + + err = 0; + goto out; + +bad: + pr_err("ceph error decoding mount_ack message\n"); +out_free: + kfree(monmap); +out: + client->mount_err = err; + mutex_lock(&monc->req_mutex); + cancel_timeout(&monc->mountreq); + mutex_unlock(&monc->req_mutex); + wake_up(&client->mount_wq); +} + + + /* * umount */ static void request_umount(struct ceph_mon_client *monc, int newmon) { struct ceph_msg *msg; - int mon = pick_mon(monc, newmon); struct ceph_client_mount *h; + int err; - dout("request_umount from mon%d\n", mon); + dout("request_umount\n"); + err = open_session(monc, newmon); + if (err) + return; msg = ceph_msg_new(CEPH_MSG_CLIENT_UNMOUNT, sizeof(*h), 0, 0, NULL); if (IS_ERR(msg)) return; h = msg->front.iov_base; h->have_version = 0; - msg->hdr.dst = monc->monmap->mon_inst[mon]; - ceph_msg_send(monc->client->msgr, msg, 0); + ceph_con_send(monc->con, msg); } void ceph_monc_request_umount(struct ceph_mon_client *monc) @@ -343,9 +483,12 @@ static int send_statfs(struct ceph_mon_client *monc, { struct ceph_msg *msg; struct ceph_mon_statfs *h; - int mon = pick_mon(monc, newmon ? 1 : -1); + int err; - dout("send_statfs to mon%d tid %llu\n", mon, req->tid); + dout("send_statfs tid %llu\n", req->tid); + err = open_session(monc, newmon); + if (err) + return err; msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL); if (IS_ERR(msg)) return PTR_ERR(msg); @@ -354,8 +497,7 @@ static int send_statfs(struct ceph_mon_client *monc, h->have_version = 0; h->fsid = monc->monmap->fsid; h->tid = cpu_to_le64(req->tid); - msg->hdr.dst = monc->monmap->mon_inst[mon]; - ceph_msg_send(monc->client->msgr, msg, 0); + ceph_con_send(monc->con, msg); return 0; } @@ -450,11 +592,7 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) dout("init\n"); memset(monc, 0, sizeof(*monc)); monc->client = cl; - monc->monmap = kzalloc(sizeof(struct ceph_monmap) + - sizeof(struct ceph_entity_addr) * CEPH_MAX_MON_MOUNT_ADDR, - GFP_KERNEL); - if (monc->monmap == NULL) - return -ENOMEM; + monc->monmap = NULL; mutex_init(&monc->statfs_mutex); INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS); monc->num_statfs_requests = 0; @@ -462,10 +600,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) INIT_DELAYED_WORK(&monc->statfs_delayed_work, do_statfs_check); init_request_type(monc, &monc->mdsreq, request_mdsmap); init_request_type(monc, &monc->osdreq, request_osdmap); + init_request_type(monc, &monc->mountreq, request_mount); init_request_type(monc, &monc->umountreq, request_umount); mutex_init(&monc->req_mutex); monc->want_mdsmap = 0; monc->want_osdmap = 0; + monc->con = NULL; return 0; } @@ -474,7 +614,12 @@ void ceph_monc_stop(struct ceph_mon_client *monc) dout("stop\n"); cancel_timeout(&monc->mdsreq); cancel_timeout(&monc->osdreq); + cancel_timeout(&monc->mountreq); cancel_timeout(&monc->umountreq); cancel_delayed_work_sync(&monc->statfs_delayed_work); + if (monc->con) { + monc->con->put(monc->con); + monc->con = NULL; + } kfree(monc->monmap); } diff --git a/src/kernel/mon_client.h b/src/kernel/mon_client.h index f7237c2fcd0e..3685a031c8a0 100644 --- a/src/kernel/mon_client.h +++ b/src/kernel/mon_client.h @@ -54,6 +54,8 @@ struct ceph_mon_client { int last_mon; /* last monitor i contacted */ struct ceph_monmap *monmap; + struct ceph_connection *con; + /* pending statfs requests */ struct mutex statfs_mutex; struct radix_tree_root statfs_request_tree; @@ -63,7 +65,7 @@ struct ceph_mon_client { /* mds/osd map or umount requests */ struct mutex req_mutex; - struct ceph_mon_request mdsreq, osdreq, umountreq; + struct ceph_mon_request mdsreq, osdreq, mountreq, umountreq; u32 want_mdsmap; u32 want_osdmap; @@ -89,6 +91,9 @@ extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want); extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); +extern void ceph_monc_request_mount(struct ceph_mon_client *monc); +extern void ceph_handle_mount_ack(struct ceph_client *client, + struct ceph_msg *msg); extern void ceph_monc_request_umount(struct ceph_mon_client *monc); extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, diff --git a/src/kernel/super.c b/src/kernel/super.c index 590751e94f2c..f4a45c837140 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -264,80 +264,6 @@ static const struct super_operations ceph_super_ops = { }; - -/* - * The monitor responds with mount ack indicate mount success. The - * included client ticket allows the client to talk to MDSs and OSDs. - */ -static int handle_mount_ack(struct ceph_client *client, struct ceph_msg *msg) -{ - struct ceph_monmap *monmap = NULL, *old = client->monc.monmap; - void *p, *end; - s32 result; - u32 len; - int err = -EINVAL; - - if (client->signed_ticket) { - dout("handle_mount_ack - already mounted\n"); - return 0; - } - - dout("handle_mount_ack\n"); - p = msg->front.iov_base; - end = p + msg->front.iov_len; - - ceph_decode_32_safe(&p, end, result, bad); - ceph_decode_32_safe(&p, end, len, bad); - if (result) { - pr_err("ceph mount denied: %.*s (%d)\n", len, (char *)p, - result); - return result; - } - p += len; - - ceph_decode_32_safe(&p, end, len, bad); - ceph_decode_need(&p, end, len, bad); - monmap = ceph_monmap_decode(p, p + len); - if (IS_ERR(monmap)) { - pr_err("ceph problem decoding monmap, %d\n", - (int)PTR_ERR(monmap)); - return -EINVAL; - } - p += len; - - ceph_decode_32_safe(&p, end, len, bad); - dout("ticket len %d\n", len); - ceph_decode_need(&p, end, len, bad); - - client->signed_ticket = kmalloc(len, GFP_KERNEL); - if (!client->signed_ticket) { - pr_err("ceph ENOMEM allocating %d bytes for client ticket\n", - len); - err = -ENOMEM; - goto out; - } - - memcpy(client->signed_ticket, p, len); - client->signed_ticket_len = len; - - client->monc.monmap = monmap; - kfree(old); - - client->whoami = le32_to_cpu(msg->hdr.dst.name.num); - client->msgr->inst.name = msg->hdr.dst.name; - pr_info("ceph mount as client%d fsid is %llx.%llx\n", client->whoami, - le64_to_cpu(__ceph_fsid_major(&client->monc.monmap->fsid)), - le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid))); - ceph_debugfs_client_init(client); - return 0; - -bad: - pr_err("ceph error decoding mount_ack message\n"); -out: - kfree(monmap); - return err; -} - const char *ceph_msg_type_name(int type) { switch (type) { @@ -507,12 +433,16 @@ bad: return -EINVAL; } -static int parse_mount_args(int flags, char *options, const char *dev_name, - struct ceph_mount_args *args, const char **path) +static int parse_mount_args(struct ceph_client *client, + int flags, char *options, const char *dev_name, + const char **path) { + struct ceph_mount_args *args = &client->mount_args; const char *c; int err; substring_t argstr[MAX_OPT_ARGS]; + int num_mon; + struct ceph_entity_addr mon_addr[CEPH_MAX_MON_MOUNT_ADDR]; int i; dout("parse_mount_args dev_name '%s'\n", dev_name); @@ -540,16 +470,28 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, } /* get mon ip(s) */ - err = parse_ips(dev_name, *path, args->mon_addr, - CEPH_MAX_MON_MOUNT_ADDR, &args->num_mon); + err = parse_ips(dev_name, *path, mon_addr, + CEPH_MAX_MON_MOUNT_ADDR, &num_mon); if (err < 0) return err; - for (i = 0; i < args->num_mon; i++) { - args->mon_addr[i].ipaddr.sin_family = AF_INET; - args->mon_addr[i].erank = 0; - args->mon_addr[i].nonce = 0; + /* build initial monmap */ + client->monc.monmap = kzalloc(sizeof(*client->monc.monmap) + + num_mon*sizeof(client->monc.monmap->mon_inst[0]), + GFP_KERNEL); + if (!client->monc.monmap) + return -ENOMEM; + for (i = 0; i < num_mon; i++) { + client->monc.monmap->mon_inst[i].addr = mon_addr[i]; + client->monc.monmap->mon_inst[i].addr.ipaddr.sin_family = + AF_INET; + client->monc.monmap->mon_inst[i].addr.erank = 0; + client->monc.monmap->mon_inst[i].addr.nonce = 0; + client->monc.monmap->mon_inst[i].name.type = + cpu_to_le32(CEPH_ENTITY_TYPE_MON); + client->monc.monmap->mon_inst[i].name.num = cpu_to_le32(i); } + client->monc.monmap->num_mon = num_mon; args->my_addr.ipaddr.sin_family = AF_INET; args->my_addr.ipaddr.sin_addr.s_addr = htonl(0); args->my_addr.ipaddr.sin_port = htons(0); @@ -822,15 +764,11 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, const char *path) { struct ceph_entity_addr *myaddr = NULL; - struct ceph_msg *mount_msg; int err; int request_interval = 5 * HZ; unsigned long timeout = client->mount_args.mount_timeout * HZ; unsigned long started = jiffies; /* note the start time */ - int which; struct dentry *root; - unsigned char r; - struct ceph_client_mount *h; dout("mount start\n"); mutex_lock(&client->mount_mutex); @@ -854,30 +792,14 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, } /* send mount request, and wait for mon, mds, and osd maps */ + ceph_monc_request_mount(&client->monc); while (!have_mon_map(client) && !client->mount_err) { err = -EIO; if (timeout && time_after_eq(jiffies, started + timeout)) goto out; - dout("mount sending mount request\n"); - get_random_bytes(&r, 1); - which = r % client->mount_args.num_mon; - mount_msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, - 0, NULL); - if (IS_ERR(mount_msg)) { - err = PTR_ERR(mount_msg); - goto out; - } - h = mount_msg->front.iov_base; - h->have_version = 0; - mount_msg->hdr.dst.name.type = - cpu_to_le32(CEPH_ENTITY_TYPE_MON); - mount_msg->hdr.dst.name.num = cpu_to_le32(which); - mount_msg->hdr.dst.addr = client->mount_args.mon_addr[which]; - - ceph_msg_send(client->msgr, mount_msg, 0); /* wait */ - dout("mount sent to mon%d, waiting for mon map\n", which); + dout("mount waiting for niynt\n"); err = wait_event_interruptible_timeout(client->mount_wq, client->mount_err || have_mon_map(client), request_interval); @@ -1025,8 +947,7 @@ void ceph_dispatch(void *p, struct ceph_msg *msg) switch (type) { case CEPH_MSG_CLIENT_MOUNT_ACK: - client->mount_err = handle_mount_ack(client, msg); - wake_up(&client->mount_wq); + ceph_handle_mount_ack(client, msg); break; /* mon client */ @@ -1125,11 +1046,11 @@ static int ceph_compare_super(struct super_block *sb, void *data) } } else { /* do we share (a) monitor? */ - for (i = 0; i < args->num_mon; i++) + for (i = 0; i < new->monc.monmap->num_mon; i++) if (ceph_monmap_contains(other->monc.monmap, - &args->mon_addr[i])) + &new->monc.monmap->mon_inst[i].addr)) break; - if (i == args->num_mon) { + if (i == new->monc.monmap->num_mon) { dout("mon ip not part of monmap\n"); return 0; } @@ -1187,8 +1108,7 @@ static int ceph_get_sb(struct file_system_type *fs_type, if (IS_ERR(client)) return PTR_ERR(client); - err = parse_mount_args(flags, data, dev_name, - &client->mount_args, &path); + err = parse_mount_args(client, flags, data, dev_name, &path); if (err < 0) goto out; diff --git a/src/kernel/super.h b/src/kernel/super.h index 561adf7e83b5..5d5c433fd268 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -55,8 +55,6 @@ struct ceph_mount_args { int caps_wanted_delay_min, caps_wanted_delay_max; ceph_fsid_t fsid; struct ceph_entity_addr my_addr; - int num_mon; - struct ceph_entity_addr mon_addr[CEPH_MAX_MON_MOUNT_ADDR]; int wsize; int rsize; /* max readahead */ int max_readdir; /* max readdir size */