From 425169f7f952dae44348b04f2fa4b251b586b4c1 Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 16 Nov 2007 05:27:33 +0000 Subject: [PATCH] no more bufferlist; streamlined ceph_msg object instead git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2070 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/client.c | 16 +- trunk/ceph/kernel/client.h | 2 +- trunk/ceph/kernel/mds_client.c | 78 +++--- trunk/ceph/kernel/mds_client.h | 12 +- trunk/ceph/kernel/mdsmap.c | 36 ++- trunk/ceph/kernel/mdsmap.h | 4 +- trunk/ceph/kernel/messenger.c | 465 ++++++++++++++++++++------------- trunk/ceph/kernel/messenger.h | 95 +++++-- trunk/ceph/kernel/mon_client.h | 2 +- trunk/ceph/kernel/monmap.h | 3 +- trunk/ceph/kernel/osd_client.h | 4 +- trunk/ceph/kernel/super.c | 6 +- 12 files changed, 443 insertions(+), 280 deletions(-) diff --git a/trunk/ceph/kernel/client.c b/trunk/ceph/kernel/client.c index 89538a19695df..fb9dfdf7b2f47 100644 --- a/trunk/ceph/kernel/client.c +++ b/trunk/ceph/kernel/client.c @@ -39,7 +39,7 @@ static int mount(struct ceph_client *client, struct ceph_mount_args *args) int ret; int attempts = 10; - atomic_set(&client->mounting, 1); + client->mounting = 7; /* send mount request */ mount_msg = ceph_new_message(CEPH_MSG_CLIENT_MOUNT, 0); @@ -50,13 +50,13 @@ trymount: inst.name.type = CEPH_ENTITY_TYPE_MON; inst.name.num = get_random_int() % args->num_mon; inst.addr = args->mon_addr[inst.name.num]; - dout(1, "ceph_get_client requesting mount from mon%d, %d attempts left\n", + dout(1, "mount from mon%d, %d attempts left\n", inst.name.num, attempts); ceph_messenger_send(client->msgr, mount_msg, &inst); /* wait */ err = wait_event_interruptible_timeout(client->mounted_wq, - atomic_read(&client->mounting) == 0, + client->mounting == 0, 6*HZ); if (err == -EINTR) return err; @@ -85,9 +85,13 @@ static void handle_mon_map(struct ceph_client *client, struct ceph_message *msg) if (err != 0) return; - /* mounted! */ - client->whoami = msg->dst.name.num; - if (atomic_dec_and_test(&client->mounting)) + if (client->whoami < 0) { + client->whoami = msg->dst.name.num; + client->msgr->inst.name = msg->dst.name; + } + + clear_bit(4, &client->mounting); + if (client->mounting == 0) wake_up(&client->mount_wq); } diff --git a/trunk/ceph/kernel/client.h b/trunk/ceph/kernel/client.h index 40a850b43b914..0e6c62bff3d63 100644 --- a/trunk/ceph/kernel/client.h +++ b/trunk/ceph/kernel/client.h @@ -39,7 +39,7 @@ struct ceph_client { struct ceph_fsid fsid; atomic_t nref; - atomic_t mounting; + int mounting; /* map bitset; 4=mon, 2=mds, 1=osd map */ wait_queue_t mount_wq; struct ceph_messenger *msgr; /* messenger instance */ diff --git a/trunk/ceph/kernel/mds_client.c b/trunk/ceph/kernel/mds_client.c index 06dfe57dfcab7..9784edd972c0a 100644 --- a/trunk/ceph/kernel/mds_client.c +++ b/trunk/ceph/kernel/mds_client.c @@ -6,12 +6,12 @@ #include "messenger.h" -static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds) +static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds) { msg->hdr.dst.addr = *ceph_mdsmap_get_addr(mdsc->mdsmap, mds); msg->hdr.dst.name.type = CEPH_ENTITY_TYPE_MDS; msg->hdr.dst.name.num = mds; - ceph_messenger_send(mdsc->client->msgr, msg); + ceph_msg_send(mdsc->client->msgr, msg); } @@ -26,7 +26,7 @@ static void get_request(struct ceph_mds_request *req) static void put_request(struct ceph_mds_request *req) { if (atomic_dec_and_test(&req->r_ref)) { - ceph_put_msg(req->r_request); + ceph_msg_put(req->r_request); kfree(req); } } @@ -36,14 +36,14 @@ static void put_request(struct ceph_mds_request *req) * register an in-flight request */ static struct ceph_mds_request * -register_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds) +register_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds) { struct ceph_mds_request *req; req = kmalloc(sizeof(*req), GFP_KERNEL); req->r_request = msg; - ceph_get_msg(msg); /* grab reference */ + ceph_msg_get(msg); /* grab reference */ req->r_reply = 0; req->r_num_mds = 0; req->r_attempts = 0; @@ -116,23 +116,26 @@ static void unregister_session(struct ceph_mds_client *mdsc, int mds) mdsc->sessions[mds] = 0; } -static struct ceph_message *create_session_msg(__u32 op, __u64 seq) +static struct ceph_msg *create_session_msg(__u32 op, __u64 seq) { - struct ceph_message *msg; + struct ceph_msg *msg; + void *p; - msg = ceph_new_message(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64)); + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64), 0, 0); if (IS_ERR(msg)) return ERR_PTR(-ENOMEM); /* fixme */ - op = cpu_to_le32(op); - ceph_bl_append_copy(&msg->payload, &op, sizeof(op)); - seq = cpu_to_le64(op); - ceph_bl_append_copy(&msg->payload, &seq, sizeof(seq)); + p = msg->front.iov_base; + *(__le32*)p = cpu_to_le32(op); + p += sizeof(__le32); + *(__le64*)p = cpu_to_le64(seq); + p += sizeof(__le64); + return msg; } static void open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int mds) { - struct ceph_message *msg; + struct ceph_msg *msg; /* connect */ if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { @@ -147,19 +150,20 @@ static void open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session * send_msg_mds(mdsc, msg, mds); } -void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_message *msg) +void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { __u32 op; __u64 seq; int err; struct ceph_mds_session *session; - struct ceph_bufferlist_iterator bli = {0, 0}; int from = msg->hdr.src.name.num; - + void *p = msg->front.iov_base; + void *end = msg->front.iov_base + msg->front.iov_len; + /* decode */ - if ((err = ceph_bl_decode_32(&msg->payload, &bli, &op)) != 0) + if ((err = ceph_decode_32(&p, end, &op)) != 0) goto bad; - if ((err = ceph_bl_decode_64(&msg->payload, &bli, &seq)) != 0) + if ((err = ceph_decode_64(&p, end, &seq)) != 0) goto bad; /* handle */ @@ -194,7 +198,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_message spin_unlock(&mdsc->lock); out: - ceph_put_msg(msg); + ceph_msg_put(msg); return; bad: @@ -225,12 +229,12 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) } -struct ceph_message * -ceph_mdsc_make_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds) +struct ceph_msg * +ceph_mdsc_make_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds) { struct ceph_mds_request *req; struct ceph_mds_session *session; - struct ceph_message *reply = 0; + struct ceph_msg *reply = 0; spin_lock(&mdsc->lock); req = register_request(mdsc, msg, mds); @@ -289,7 +293,7 @@ retry: } -void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_message *msg) +void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { struct ceph_mds_request *req; __u64 tid; @@ -314,21 +318,22 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_message *m put_request(req); } -void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message *msg) +void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { struct ceph_mds_request *req; __u64 tid; __u32 next_mds; __u32 fwd_seq; int err; - struct ceph_bufferlist_iterator bli = {0, 0}; + void *p = msg->front.iov_base; + void *end = p + msg->front.iov_len; /* decode */ - if ((err = ceph_bl_decode_64(&msg->payload, &bli, &tid)) != 0) + if ((err = ceph_decode_64(&p, end, &tid)) != 0) goto bad; - if ((err = ceph_bl_decode_32(&msg->payload, &bli, &next_mds)) != 0) + if ((err = ceph_decode_32(&p, end, &next_mds)) != 0) goto bad; - if ((err = ceph_bl_decode_32(&msg->payload, &bli, &fwd_seq)) != 0) + if ((err = ceph_decode_32(&p, end, &fwd_seq)) != 0) goto bad; /* handle */ @@ -365,7 +370,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message put_request(req); out: - ceph_put_msg(msg); + ceph_msg_put(msg); return; bad: @@ -374,25 +379,24 @@ bad: } -void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, - struct ceph_message *msg) +void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { - struct ceph_bufferlist_iterator bli; __u64 epoch; __u32 left; int err; + void *p = msg->front.iov_base; + void *end = p + msg->front.iov_len; - ceph_bl_iterator_init(&bli); - if ((err = ceph_bl_decode_64(&msg->payload, &bli, &epoch)) != 0) + if ((err = ceph_decode_64(&p, end, &epoch)) != 0) goto bad; - if ((err = ceph_bl_decode_32(&msg->payload, &bli, &left)) != 0) + if ((err = ceph_decode_32(&p, end, &left)) != 0) goto bad; dout(2, "ceph_mdsc_handle_map epoch %llu\n", epoch); spin_lock(&mdsc->lock); if (epoch > mdsc->mdsmap->m_epoch) { - ceph_mdsmap_decode(mdsc->mdsmap, &msg->payload, &bli); + ceph_mdsmap_decode(mdsc->mdsmap, &p, end); spin_unlock(&mdsc->lock); complete(&mdsc->map_waiters); } else { @@ -400,7 +404,7 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, } out: - ceph_put_msg(msg); + ceph_msg_put(msg); return; bad: dout(1, "corrupt map\n"); diff --git a/trunk/ceph/kernel/mds_client.h b/trunk/ceph/kernel/mds_client.h index f23c1814b16d6..67f3e9188b739 100644 --- a/trunk/ceph/kernel/mds_client.h +++ b/trunk/ceph/kernel/mds_client.h @@ -29,8 +29,8 @@ struct ceph_mds_session { struct ceph_mds_request { __u64 r_tid; - struct ceph_message *r_request; - struct ceph_message *r_reply; + struct ceph_msg *r_request; + struct ceph_msg *r_reply; __u32 r_mds[4]; /* set of mds's with whom request may be outstanding */ int r_num_mds; /* items in r_mds */ @@ -63,9 +63,9 @@ struct ceph_mds_client { extern void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client); -extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds); -extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_message *msg); -extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message *msg); -extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_message *msg); +extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds); +extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg); #endif diff --git a/trunk/ceph/kernel/mdsmap.c b/trunk/ceph/kernel/mdsmap.c index f0990c1178689..ab38162e7bf5f 100644 --- a/trunk/ceph/kernel/mdsmap.c +++ b/trunk/ceph/kernel/mdsmap.c @@ -44,27 +44,25 @@ struct ceph_entity_addr *ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w) return &m->m_addr[w]; } -int ceph_mdsmap_decode(struct ceph_mdsmap *m, - struct ceph_bufferlist *bl, - struct ceph_bufferlist_iterator *bli) +int ceph_mdsmap_decode(struct ceph_mdsmap *m, void **p, void *end) { int i, n; __u32 mds; int err; - if ((err = ceph_bl_decode_64(bl, bli, &m->m_epoch)) != 0) + if ((err = ceph_decode_64(p, end, &m->m_epoch)) != 0) goto bad; - if ((err = ceph_bl_decode_64(bl, bli, &m->m_client_epoch)) != 0) + if ((err = ceph_decode_64(p, end, &m->m_client_epoch)) != 0) goto bad; - if ((err = ceph_bl_decode_32(bl, bli, &m->m_created.tv_sec)) != 0) + if ((err = ceph_decode_32(p, end, &m->m_created.tv_sec)) != 0) goto bad; - if ((err = ceph_bl_decode_32(bl, bli, &m->m_created.tv_usec)) != 0) + if ((err = ceph_decode_32(p, end, &m->m_created.tv_usec)) != 0) goto bad; - if ((err = ceph_bl_decode_32(bl, bli, &m->m_anchortable)) != 0) + if ((err = ceph_decode_32(p, end, &m->m_anchortable)) != 0) goto bad; - if ((err = ceph_bl_decode_32(bl, bli, &m->m_root)) != 0) + if ((err = ceph_decode_32(p, end, &m->m_root)) != 0) goto bad; - if ((err = ceph_bl_decode_32(bl, bli, &m->m_max_mds)) != 0) + if ((err = ceph_decode_32(p, end, &m->m_max_mds)) != 0) goto bad; m->m_addr = kmalloc(m->m_max_mds*sizeof(*m->m_addr), GFP_KERNEL); @@ -72,28 +70,28 @@ int ceph_mdsmap_decode(struct ceph_mdsmap *m, memset(m->m_state, 0, m->m_max_mds); /* state */ - if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0) + if ((err = ceph_decode_32(p, end, &n)) != 0) goto bad; for (i=0; im_state[mds])) != 0) + if ((err = ceph_decode_32(p, end, &m->m_state[mds])) != 0) goto bad; } /* state_seq */ - if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0) + if ((err = ceph_decode_32(p, end, &n)) != 0) goto bad; - ceph_bl_iterator_advance(bl, bli, n*(sizeof(__u32)+sizeof(__u64))); + *p += n*(sizeof(__u32)+sizeof(__u64)); /* mds_inst */ - if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0) + if ((err = ceph_decode_32(p, end, &n)) != 0) goto bad; for (i=0; im_addr[mds])) != 0) + *p += sizeof(struct ceph_entity_name); + if ((err = ceph_decode_addr(p, end, &m->m_addr[mds])) != 0) goto bad; } diff --git a/trunk/ceph/kernel/mdsmap.h b/trunk/ceph/kernel/mdsmap.h index 99396c0558665..62dd35bde99a8 100644 --- a/trunk/ceph/kernel/mdsmap.h +++ b/trunk/ceph/kernel/mdsmap.h @@ -2,7 +2,6 @@ #define _FS_CEPH_MDSMAP_H #include -#include "bufferlist.h" /* see mds/MDSMap.h */ #define CEPH_MDS_STATE_DNE 0 /* down, never existed. */ @@ -40,6 +39,7 @@ extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m); extern int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w); extern struct ceph_entity_addr *ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w); -extern int ceph_mdsmap_decode(struct ceph_mdsmap *m, struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli); +extern int ceph_mdsmap_decode(struct ceph_mdsmap *m, void **p, void *end); + #endif diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index e2fd9e3f310b6..a922e13cc8b14 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -2,9 +2,9 @@ #include #include #include -#include - +#include #include +#include #include "messenger.h" #include "ktcp.h" @@ -19,6 +19,31 @@ static void try_read(struct work_struct *); static void try_write(struct work_struct *); static void try_accept(struct work_struct *); + + +/* + * calculate the number of pages a given length and offset map onto, + * if we align the data. + */ +static int calc_pages_for(int len, int off) +{ + int nr = 0; + if (len == 0) + return 0; + if (off + len < PAGE_SIZE) + return 1; + if (off) { + nr++; + len -= off; + } + nr += len >> PAGE_SHIFT; + if (len & PAGE_MASK) + nr++; + return nr; +} + + + /* * connections */ @@ -29,10 +54,9 @@ static void try_accept(struct work_struct *); static struct ceph_connection *new_connection(struct ceph_messenger *msgr) { struct ceph_connection *con; - con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL); + con = kzalloc(sizeof(struct ceph_connection), GFP_KERNEL); if (con == NULL) return NULL; - memset(con, 0, sizeof(*con)); con->msgr = msgr; @@ -199,61 +223,89 @@ static int do_connect(struct ceph_connection *con) /* * write as much of con->out_partial to the socket as we can. - * 1 -> done; and cleaned up out_partial + * 1 -> done * 0 -> socket full, but more to do * <0 -> error */ -static int write_partial(struct ceph_connection *con) +static int write_partial_kvec(struct ceph_connection *con) { - struct ceph_bufferlist *bl = &con->out_partial; - struct ceph_bufferlist_iterator *p = &con->out_pos; - int len, ret; + int ret; -more: - len = bl->b_kv[p->i_kv].iov_len - p->i_off; - /* FIXME */ - /* ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len); */ - if (ret < 0) return ret; - if (ret == 0) return 0; /* socket full */ - if (ret + p->i_off == bl->b_kv[p->i_kv].iov_len) { - p->i_kv++; - p->i_off = 0; - if (p->i_kv == bl->b_kvlen) - return 1; - } else { - p->i_off += ret; + while (con->out_kvec_bytes > 0) { + ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes, 0); + if (ret < 0) return ret; /* error */ + if (ret == 0) return 0; /* socket full */ + con->out_kvec_bytes -= ret; + if (con->out_kvec_bytes == 0) + break; /* done */ + while (ret > 0) { + if (ret >= con->out_kvec_cur->iov_len) { + ret -= con->out_kvec_cur->iov_len; + con->out_kvec_cur++; + } else { + con->out_kvec_cur->iov_len -= ret; + con->out_kvec_cur->iov_base += ret; + ret = 0; + break; + } + } } - goto more; + con->out_kvec_left = 0; + return 1; /* done! */ } +static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct kvec kv; + int ret; + + while (con->out_msg_pos.page < con->out_msg->nr_pages) { + kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos; + kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), + (int)(msg->hdr.data_len - con->out_msg_pos.data_pos)); + ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len, 0); + if (ret < 0) return ret; + if (ret == 0) return 0; /* socket full */ + con->out_msg_pos.data_pos += ret; + con->out_msg_pos.page_pos += ret; + if (ret == kv.iov_len) { + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.page++; + } + } + + /* done */ + con->out_msg = 0; + return 1; +} + + /* * build out_partial based on the next outgoing message in the queue. */ static void prepare_write_message(struct ceph_connection *con) { - struct ceph_message *m = list_entry(con->out_queue.next, struct ceph_message, list_head); - int i; + struct ceph_msg *m = list_entry(con->out_queue.next, struct ceph_msg, list_head); /* move to sending/sent list */ list_del(&m->list_head); list_add(&m->list_head, &con->out_sent); - - ceph_bl_init(&con->out_partial); - ceph_bl_iterator_init(&con->out_pos); - - /* always one chunk, for now */ - m->hdr.nchunks = 1; - m->chunklens[0] = m->payload.b_len; - - /* tag + header */ - ceph_bl_append_ref(&con->out_partial, &tag_msg, 1); - ceph_bl_append_ref(&con->out_partial, &m->hdr, sizeof(m->hdr)); - - /* payload */ - ceph_bl_append_ref(&con->out_partial, &m->chunklens[0], sizeof(__u32)); - for (i=0; ipayload.b_kvlen; i++) - ceph_bl_append_ref(&con->out_partial, m->payload.b_kv[i].iov_base, - m->payload.b_kv[i].iov_len); + con->out_msg = m; + + /* tag + hdr + front */ + con->out_kvec[0].iov_base = &tag_msg; + con->out_kvec[0].iov_len = 1; + con->out_kvec[1].iov_base = &m->hdr; + con->out_kvec[1].iov_len = sizeof(m->hdr); + con->out_kvec[2] = m->front; + con->out_kvec_left = 3; + con->out_kvec_bytes = 1 + sizeof(m->hdr) + m->front.iov_len; + con->out_kvec_cur = con->out_kvec; + + /* pages */ + con->out_msg_pos.page = 0; + con->out_msg_pos.page_pos = m->hdr.data_off & PAGE_MASK; + con->out_msg_pos.data_pos = 0; } /* @@ -262,32 +314,42 @@ static void prepare_write_message(struct ceph_connection *con) static void prepare_write_ack(struct ceph_connection *con) { con->in_seq_acked = con->in_seq; - - ceph_bl_init(&con->out_partial); - ceph_bl_iterator_init(&con->out_pos); - ceph_bl_append_ref(&con->out_partial, &tag_ack, 1); - ceph_bl_append_ref(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked)); + + con->out_kvec[0].iov_base = &tag_ack; + con->out_kvec[0].iov_len = 1; + con->out_kvec[1].iov_base = &con->in_seq_acked; + con->out_kvec[1].iov_len = sizeof(con->in_seq_acked); + con->out_kvec_left = 2; + con->out_kvec_bytes = 1 + sizeof(con->in_seq_acked); + con->out_kvec_cur = con->out_kvec; } static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con) { - ceph_bl_init(&con->out_partial); - ceph_bl_iterator_init(&con->out_pos); - ceph_bl_append_ref(&con->out_partial, &msgr->addr, sizeof(msgr->addr)); + con->out_kvec[0].iov_base = &msgr->inst.addr; + con->out_kvec[0].iov_len = sizeof(msgr->inst.addr); + con->out_kvec_left = 1; + con->out_kvec_bytes = sizeof(msgr->inst.addr); + con->out_kvec_cur = con->out_kvec; } static void prepare_write_accept_ready(struct ceph_connection *con) { - ceph_bl_init(&con->out_partial); - ceph_bl_iterator_init(&con->out_pos); - ceph_bl_append_ref(&con->out_partial, &tag_ready, 1); + con->out_kvec[0].iov_base = &tag_ready; + con->out_kvec[0].iov_len = 1; + con->out_kvec_left = 1; + con->out_kvec_bytes = 1; + con->out_kvec_cur = con->out_kvec; } static void prepare_write_accept_reject(struct ceph_connection *con) { - ceph_bl_init(&con->out_partial); - ceph_bl_iterator_init(&con->out_pos); - ceph_bl_append_ref(&con->out_partial, &tag_reject, 1); - ceph_bl_append_ref(&con->out_partial, &con->connect_seq, sizeof(con->connect_seq)); + con->out_kvec[0].iov_base = &tag_reject; + con->out_kvec[0].iov_len = 1; + con->out_kvec[1].iov_base = &con->connect_seq; + con->out_kvec[1].iov_len = sizeof(con->connect_seq); + con->out_kvec_left = 2; + con->out_kvec_bytes = 1 + sizeof(con->connect_seq); + con->out_kvec_cur = con->out_kvec; } /* @@ -295,24 +357,20 @@ static void prepare_write_accept_reject(struct ceph_connection *con) */ static void try_write(struct work_struct *work) { - int ret; struct ceph_connection *con; struct ceph_messenger *msgr; + int ret; con = container_of(work, struct ceph_connection, swork); msgr = con->msgr; more: - /* data queued? */ - if (con->out_partial.b_kvlen) { - ret = write_partial(con); - if (ret == 0) goto done; - - /* error or success */ - /* clean up */ - ceph_bl_init(&con->out_partial); - ceph_bl_iterator_init(&con->out_pos); - + /* kvec data queued? */ + if (con->out_kvec_left) { + ret = write_partial_kvec(con); + if (ret == 0) + goto done; + if (con->state == REJECTING) { /* FIXME do something else here, pbly? */ remove_connection(msgr, con); @@ -321,7 +379,15 @@ more: } /* TBD: handle error; return for now */ - if (ret < 0) goto done; /* error */ + if (ret < 0) + goto done; /* error */ + } + + /* msg pages? */ + if (con->out_msg) { + ret = write_partial_msg_pages(con, con->out_msg); + if (ret == 0) + goto done; } /* anything else pending? */ @@ -340,50 +406,96 @@ done: } +/* + * prepare to read a message + */ +static int prepare_read_message(struct ceph_connection *con) +{ + con->in_tag = CEPH_MSGR_TAG_MSG; + con->in_base_pos = 0; + con->in_msg = kzalloc(sizeof(*con->in_msg), GFP_KERNEL); + if (con->in_msg == NULL) { + /* TBD: we don't check for error in caller, handle error */ + derr(1, "kmalloc failure on incoming message\n"); + return -ENOMEM; + } + + ceph_msg_get(con->in_msg); + return 0; +} + /* * read (part of) a message */ static int read_message_partial(struct ceph_connection *con) { - struct ceph_message *m = con->in_partial; - int ret, s, chunkbytes, c, did; - size_t left; + struct ceph_msg *m = con->in_msg; + void *p; + int ret; + int want, left; - while (con->in_base_pos < sizeof(struct ceph_message_header)) { - left = sizeof(struct ceph_message_header) - con->in_base_pos; + /* header */ + while (con->in_base_pos < sizeof(struct ceph_msg_header)) { + left = sizeof(struct ceph_msg_header) - con->in_base_pos; ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0); if (ret <= 0) return ret; con->in_base_pos += ret; } - if (m->hdr.nchunks == 0) return 1; /* done */ - chunkbytes = sizeof(__u32)*m->hdr.nchunks; - while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) { - int off = con->in_base_pos - sizeof(struct ceph_message_header); - left = chunkbytes + sizeof(struct ceph_message_header) - con->in_base_pos; - ret = _krecvmsg(con->sock, (char*)m->chunklens + off, left, 0); + /* front */ + if (m->front.iov_len < m->hdr.front_len) { + if (m->front.iov_base == NULL) { + m->front.iov_base = kmalloc(m->hdr.front_len, GFP_KERNEL); + if (m->front.iov_base == NULL) + return -ENOMEM; + } + left = m->hdr.front_len - m->front.iov_len; + ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left, 0); if (ret <= 0) return ret; - con->in_base_pos += ret; + m->front.iov_len += ret; } - - did = 0; - for (c = 0; chdr.nchunks; c++) { - more: - left = did + m->chunklens[c] - m->payload.b_len; - if (left <= 0) { - did += m->chunklens[c]; - continue; - } - ceph_bl_prepare_append(&m->payload, left); - s = min(m->payload.b_append.iov_len, left); - ret = _krecvmsg(con->sock, m->payload.b_append.iov_base, s, 0); + + /* (page) data */ + if (m->hdr.data_len == 0) + goto done; + if (m->nr_pages == 0) { + want = calc_pages_for(m->hdr.data_len, m->hdr.data_off); + m->pages = kmalloc(want * sizeof(*m->pages), GFP_KERNEL); + if (m->pages == NULL) + return -ENOMEM; + m->nr_pages = want; + con->in_msg_pos.page = 0; + con->in_msg_pos.page_pos = m->hdr.data_off; + con->in_msg_pos.data_pos = 0; + } + while (con->in_msg_pos.data_pos < m->hdr.data_len) { + left = min((int)(m->hdr.data_len - con->in_msg_pos.data_pos), + (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); + p = kmap(m->pages[con->in_msg_pos.page]); + ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left, 0); if (ret <= 0) return ret; - ceph_bl_append_copied(&m->payload, s); - goto more; + con->in_msg_pos.data_pos += ret; + con->in_msg_pos.page_pos += ret; + if (con->in_msg_pos.page_pos == PAGE_SIZE) { + con->in_msg_pos.page_pos = 0; + con->in_msg_pos.page++; + } } + +done: return 1; /* done! */ } + +/* + * prepare to read an ack + */ +static void prepare_read_ack(struct ceph_connection *con) +{ + con->in_tag = CEPH_MSGR_TAG_ACK; + con->in_base_pos = 0; +} + /* * read (part of) an ack */ @@ -398,7 +510,22 @@ static int read_ack_partial(struct ceph_connection *con) return 1; /* done */ } +static void process_ack(struct ceph_connection *con, __u32 ack) +{ + struct ceph_msg *m; + while (!list_empty(&con->out_sent)) { + m = list_entry(con->out_sent.next, struct ceph_msg, list_head); + if (m->hdr.seq > ack) break; + dout(5, "got ack for %d type %d at %p\n", m->hdr.seq, m->hdr.type, m); + list_del(&m->list_head); + ceph_msg_put(m); + } +} + +/* + * read portion of handshake on a newly accepted connection + */ static int read_accept_partial(struct ceph_connection *con) { int ret; @@ -422,48 +549,6 @@ static int read_accept_partial(struct ceph_connection *con) return 1; /* done */ } -/* - * prepare to read a message - */ -static void prepare_read_message(struct ceph_connection *con) -{ - con->in_tag = CEPH_MSGR_TAG_MSG; - con->in_base_pos = 0; - con->in_partial = kmalloc(sizeof(struct ceph_message), GFP_KERNEL); - if (con->in_partial == NULL) { - /* TBD: we don't check for error in caller, handle error */ - printk(KERN_INFO "malloc failure\n"); - goto done; - } - - ceph_get_msg(con->in_partial); - ceph_bl_init(&con->in_partial->payload); - ceph_bl_iterator_init(&con->in_pos); -done: - return; -} - -/* - * prepare to read an ack - */ -static void prepare_read_ack(struct ceph_connection *con) -{ - con->in_tag = CEPH_MSGR_TAG_ACK; - con->in_base_pos = 0; -} - -static void process_ack(struct ceph_connection *con, __u32 ack) -{ - struct ceph_message *m; - while (!list_empty(&con->out_sent)) { - m = list_entry(con->out_sent.next, struct ceph_message, list_head); - if (m->hdr.seq > ack) break; - dout(5, "got ack for %d type %d at %p\n", m->hdr.seq, m->hdr.type, m); - list_del(&m->list_head); - ceph_put_msg(m); - } -} - /* * call after a new connection's handshake has completed */ @@ -476,7 +561,7 @@ static void process_accept(struct ceph_connection *con) existing = get_connection(con->msgr, &con->peer_addr); if (existing) { spin_lock(&existing->con_lock); - if ((existing->state == CONNECTING && compare_addr(&con->msgr->addr, &con->peer_addr)) || + if ((existing->state == CONNECTING && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) || (existing->state == OPEN && con->connect_seq == existing->connect_seq)) { /* replace existing with new connection */ replace_connection(con->msgr, existing, con); @@ -556,9 +641,9 @@ more: /* TBD: do something with error */ if (ret <= 0) goto done; /* got a full message! */ - msgr->dispatch(con->msgr->parent, con->in_partial); - ceph_put_msg(con->in_partial); - con->in_partial = 0; + msgr->dispatch(con->msgr->parent, con->in_msg); + ceph_msg_put(con->in_msg); + con->in_msg = 0; con->in_tag = CEPH_MSGR_TAG_READY; goto more; } @@ -593,20 +678,19 @@ static void try_accept(struct work_struct *work) msgr = container_of(work, struct ceph_messenger, awork); sock = msgr->listen_sock; - - printk(KERN_INFO "Entered try_accept\n"); + dout(5, "Entered try_accept\n"); if (kernel_accept(sock, &new_sock, sock->file->f_flags) < 0) { - printk(KERN_INFO "error accepting connection \n"); + derr(1, "error accepting connection\n"); goto done; } - printk(KERN_INFO "accepted connection \n"); + dout(5, "accepted connection \n"); /* get the address at the other end */ memset(&saddr, 0, sizeof(saddr)); if (new_sock->ops->getname(new_sock, (struct sockaddr *)&saddr, &len, 2)) { - printk(KERN_INFO "getname error connection aborted\n"); + derr(1, "getname error connection aborted\n"); sock_release(new_sock); goto done; } @@ -614,7 +698,7 @@ static void try_accept(struct work_struct *work) /* initialize the msgr connection */ new_con = new_connection(msgr); if (new_con == NULL) { - printk(KERN_INFO "malloc failure\n"); + derr(1, "malloc failure\n"); sock_release(new_sock); goto done; } @@ -642,10 +726,9 @@ struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr) { struct ceph_messenger *msgr; - msgr = kmalloc(sizeof(*msgr), GFP_KERNEL); + msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); if (msgr == NULL) return NULL; - memset(msgr, 0, sizeof(*msgr)); spin_lock_init(&msgr->con_lock); @@ -663,11 +746,16 @@ struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr) /* * queue up an outgoing message + * + * will take+drop msgr, then connection locks. */ -int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg) +int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) { struct ceph_connection *con; + /* set source */ + msg->hdr.src = msgr->inst; + /* do we have the connection? */ spin_lock(&msgr->con_lock); con = get_connection(msgr, &msg->hdr.dst.addr); @@ -676,13 +764,15 @@ int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg) if (IS_ERR(con)) return PTR_ERR(con); dout(5, "opening new connection to peer %x:%d\n", - ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), msg->hdr.dst.addr.ipaddr.sin_port); + ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), + msg->hdr.dst.addr.ipaddr.sin_port); con->peer_addr = msg->hdr.dst.addr; con->state = CONNECTING; add_connection(msgr, con); } else { dout(5, "had connection to peer %x:%d\n", - msg->hdr.dst.addr.ipaddr.sin_addr.s_addr, msg->hdr.dst.addr.ipaddr.sin_port); + msg->hdr.dst.addr.ipaddr.sin_addr.s_addr, + msg->hdr.dst.addr.ipaddr.sin_port); } spin_unlock(&msgr->con_lock); @@ -695,43 +785,68 @@ int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg) /* queue */ dout(1, "queuing outgoing message for %s.%d\n", ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num); - ceph_get_msg(msg); + ceph_msg_get(msg); list_add(&con->out_queue, &msg->list_head); - + spin_unlock(&con->con_lock); put_connection(con); + return 0; } -struct ceph_message *ceph_new_message(int type, int size) +struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off) { - struct ceph_message *m; + struct ceph_msg *m; + int i; - m = kmalloc(sizeof(*m), GFP_KERNEL); + m = kzalloc(sizeof(*m), GFP_KERNEL); if (m == NULL) - return ERR_PTR(-ENOMEM); - memset(m, 0, sizeof(*m)); + goto out; + atomic_set(&m->nref, 1); m->hdr.type = type; - - if (size) { - BUG_ON(size); /* implement me */ + m->hdr.front_len = front_len; + m->hdr.data_len = page_len; + m->hdr.data_off = page_off; + + /* front */ + m->front.iov_base = kmalloc(front_len, GFP_KERNEL); + if (m->front.iov_base == NULL) + goto out2; + m->front.iov_len = front_len; + + /* pages */ + m->nr_pages = calc_pages_for(page_len, page_off); + if (m->nr_pages) { + m->pages = kzalloc(m->nr_pages*sizeof(*m->pages), GFP_KERNEL); + for (i=0; inr_pages; i++) { + m->pages[i] = alloc_page(GFP_KERNEL); + if (m->pages[i] == NULL) + goto out2; + } } - return m; -} - +out2: + ceph_msg_put(m); +out: + return ERR_PTR(-ENOMEM); +} -int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v) +void ceph_msg_put(struct ceph_msg *m) { - int err; - if (!ceph_bl_decode_have(bl, bli, sizeof(*v))) - return -EINVAL; - if ((err = ceph_bl_decode_32(bl, bli, &v->erank)) != 0) - return -EINVAL; - if ((err = ceph_bl_decode_32(bl, bli, &v->nonce)) != 0) - return -EINVAL; - ceph_bl_copy(bl, bli, &v->ipaddr, sizeof(v->ipaddr)); - return 0; + if (atomic_dec_and_test(&m->nref)) { + int i; + if (m->pages) { + for (i=0; inr_pages; i++) + if (m->pages[i]) + kfree(m->pages[i]); + kfree(m->pages); + } + if (m->front.iov_base) + kfree(m->front.iov_base); + kfree(m); + } } + + diff --git a/trunk/ceph/kernel/messenger.h b/trunk/ceph/kernel/messenger.h index 9ea1e07a7605b..d2acc4135feb0 100644 --- a/trunk/ceph/kernel/messenger.h +++ b/trunk/ceph/kernel/messenger.h @@ -6,13 +6,12 @@ #include #include #include -#include "bufferlist.h" -struct ceph_message; +struct ceph_msg; -typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_message *m); +typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_msg *m); -__inline__ const char *ceph_name_type_str(int t) { +static __inline__ const char *ceph_name_type_str(int t) { switch (t) { case CEPH_ENTITY_TYPE_MON: return "mon"; case CEPH_ENTITY_TYPE_MDS: return "mds"; @@ -29,22 +28,29 @@ struct ceph_messenger { ceph_messenger_dispatch_t dispatch; struct socket *listen_sock; /* listening socket */ struct work_struct awork; /* accept work */ - struct ceph_entity_addr addr; /* my address */ + struct ceph_entity_inst inst; /* my name+address */ spinlock_t con_lock; struct list_head con_all; /* all connections */ struct list_head con_accepting; /* doing handshake, or */ struct radix_tree_root con_open; /* established. see get_connection() */ }; -struct ceph_message { - struct ceph_message_header hdr; /* header */ - __u32 chunklens[2]; - struct ceph_bufferlist payload; +struct ceph_msg { + struct ceph_msg_header hdr; /* header */ + struct kvec front; /* first bit of message */ + struct page **pages; /* data payload */ + unsigned nr_pages; /* size of page array */ struct list_head list_head; atomic_t nref; }; +struct ceph_msg_pos { + int page, page_pos; /* which page; -3=tag, -2=hdr, -1=front */ + int data_pos; +}; + + /* current state of connection */ enum ceph_connection_state { NEW = 1, @@ -75,16 +81,23 @@ struct ceph_connection { /* out queue */ struct list_head out_queue; - struct ceph_bufferlist out_partial; /* refereces existing bufferlists; do not free() */ - struct ceph_bufferlist_iterator out_pos; + + struct kvec out_kvec[4], + *out_kvec_cur; + int out_kvec_left; /* kvec's left */ + int out_kvec_bytes; /* bytes left */ + + struct ceph_msg *out_msg; + struct ceph_msg_pos out_msg_pos; + struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */ /* partially read message contents */ char in_tag; /* READY (accepting, or no in-progress read) or ACK or MSG */ int in_base_pos; /* for ack seq, or msg headers, or accept handshake */ __u32 in_partial_ack; - struct ceph_message *in_partial; - struct ceph_bufferlist_iterator in_pos; /* for msg payload */ + struct ceph_msg *in_msg; + struct ceph_msg_pos in_msg_pos; struct work_struct rwork; /* received work */ struct work_struct swork; /* send work */ @@ -93,25 +106,55 @@ struct ceph_connection { }; -/* messenger */ -extern int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg); - +extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off); +static __inline__ void ceph_msg_get(struct ceph_msg *msg) { + atomic_inc(&msg->nref); +} +extern void ceph_msg_put(struct ceph_msg *msg); +extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg); -/* messages */ -extern struct ceph_message *ceph_new_message(int type, int size); -static __inline__ void ceph_put_msg(struct ceph_message *msg) { - if (atomic_dec_and_test(&msg->nref)) { - ceph_bl_clear(&msg->payload); - kfree(msg); - } +static __inline__ int ceph_decode_64(void **p, void *end, __u64 *v) { + if (*p + sizeof(v) > end) + return -EINVAL; + *v = le64_to_cpu(*(__u64*)p); + p += sizeof(*v); + return 0; +} +static __inline__ int ceph_decode_32(void **p, void *end, __u32 *v) { + if (*p + sizeof(v) > end) + return -EINVAL; + *v = le32_to_cpu(*(__u32*)p); + p += sizeof(*v); + return 0; +} +static __inline__ int ceph_decode_16(void **p, void *end, __u16 *v) { + if (*p + sizeof(v) > end) + return -EINVAL; + *v = le16_to_cpu(*(__u16*)p); + p += sizeof(*v); + return 0; +} +static __inline__ int ceph_decode_copy(void **p, void *end, void *v, int len) { + if (*p + len > end) + return -EINVAL; + memcpy(v, *p, len); + *p += len; + return 0; } -static __inline__ void ceph_get_msg(struct ceph_message *msg) { - atomic_inc(&msg->nref); +static __inline__ int ceph_decode_addr(void **p, void *end, struct ceph_entity_addr *v) { + int err; + if (*p + sizeof(*v) > end) + return -EINVAL; + if ((err = ceph_decode_32(p, end, &v->erank)) != 0) + return -EINVAL; + if ((err = ceph_decode_32(p, end, &v->nonce)) != 0) + return -EINVAL; + ceph_decode_copy(p, end, &v->ipaddr, sizeof(v->ipaddr)); + return 0; } -extern int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v); #endif diff --git a/trunk/ceph/kernel/mon_client.h b/trunk/ceph/kernel/mon_client.h index 1e1147e0bfda6..d44b9c5e687ec 100644 --- a/trunk/ceph/kernel/mon_client.h +++ b/trunk/ceph/kernel/mon_client.h @@ -12,7 +12,7 @@ struct ceph_mon_client { }; extern void ceph_monc_init(struct ceph_mon_client *monc); -extern void ceph_monc_handle_monmap(struct ceph_mon_client *monc, struct ceph_message *m); +extern void ceph_monc_handle_monmap(struct ceph_mon_client *monc, struct ceph_msg *m); extern void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, __u64 have); extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, __u64 have); diff --git a/trunk/ceph/kernel/monmap.h b/trunk/ceph/kernel/monmap.h index e76a45977b7c9..5e623b10352d2 100644 --- a/trunk/ceph/kernel/monmap.h +++ b/trunk/ceph/kernel/monmap.h @@ -2,7 +2,6 @@ #define _FS_CEPH_MONMAP_H #include -#include "bufferlist.h" /* * monitor map @@ -13,6 +12,6 @@ struct ceph_monmap { struct ceph_entity_inst *mon_inst; }; -extern int ceph_monmap_decode(struct ceph_monmap *m, struct ceph_bufferlist *bl); +extern int ceph_monmap_decode(struct ceph_monmap *m, void **p, void *end); #endif diff --git a/trunk/ceph/kernel/osd_client.h b/trunk/ceph/kernel/osd_client.h index ef99ba2c0a39c..6f643646b9427 100644 --- a/trunk/ceph/kernel/osd_client.h +++ b/trunk/ceph/kernel/osd_client.h @@ -15,8 +15,8 @@ struct ceph_osd_client { }; extern void ceph_osdc_init(struct ceph_osd_client *osdc); -extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_message *msg); -extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_message *msg); +extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg); +extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); #endif diff --git a/trunk/ceph/kernel/super.c b/trunk/ceph/kernel/super.c index 9a213c4090bd9..7eba370f833f5 100644 --- a/trunk/ceph/kernel/super.c +++ b/trunk/ceph/kernel/super.c @@ -135,10 +135,9 @@ static int ceph_set_super(struct super_block *s, void *data) s->s_flags = args->mntflags; - sbinfo = kmalloc(sizeof(struct ceph_super_info), GFP_KERNEL); + sbinfo = kzalloc(sizeof(struct ceph_super_info), GFP_KERNEL); if (!sbinfo) return -ENOMEM; - memset(sbinfo, 0, sizeof(*sbinfo)); s->s_fs_info = sbinfo; memcpy(&sbinfo->mount_args, args, sizeof(*args)); @@ -247,7 +246,7 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru token = match_token(c, arg_tokens, argstr); ret = match_int(&argstr[0], &intval); if (ret < 0) { - dout(0, "bad mount arg\n"); + dout(0, "bad mount arg, not int\n"); continue; } switch (token) { @@ -264,6 +263,7 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru ceph_debug = intval; break; default: + derr(1, "bad mount option %s\n", c); continue; } } -- 2.39.5