From c0d0c4bd47319a3934fccbb41b620174ecf06b4d Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 29 Nov 2007 00:27:19 +0000 Subject: [PATCH] kernel messenger sort of working! git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2139 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/include/ceph_fs.h | 6 +- trunk/ceph/kernel/client.c | 8 +- trunk/ceph/kernel/ktcp.c | 54 ++++------ trunk/ceph/kernel/mds_client.c | 2 + trunk/ceph/kernel/messenger.c | 188 ++++++++++++++++++++++++++------- trunk/ceph/kernel/messenger.h | 50 +++++---- trunk/ceph/kernel/osd_client.c | 4 + trunk/ceph/kernel/super.c | 35 +++--- trunk/ceph/kernel/super.h | 1 + 9 files changed, 236 insertions(+), 112 deletions(-) diff --git a/trunk/ceph/include/ceph_fs.h b/trunk/ceph/include/ceph_fs.h index a60ed84099d31..987996a623ee9 100644 --- a/trunk/ceph/include/ceph_fs.h +++ b/trunk/ceph/include/ceph_fs.h @@ -189,9 +189,9 @@ struct ceph_entity_addr { struct sockaddr_in ipaddr; }; -#define ceph_entity_addr_is_local(a,b) \ - ((a).nonce == (b).nonce && \ - (a).ipaddr == (b).ipaddr) +#define ceph_entity_addr_is_local(a,b) \ + ((a).nonce == (b).nonce && \ + (a).ipaddr.sin_addr.s_addr == (b).ipaddr.sin_addr.s_addr) #define compare_addr(a, b) \ ((a)->erank == (b)->erank && \ diff --git a/trunk/ceph/kernel/client.c b/trunk/ceph/kernel/client.c index 2c11678458411..93b07fcb4b5d6 100644 --- a/trunk/ceph/kernel/client.c +++ b/trunk/ceph/kernel/client.c @@ -9,7 +9,7 @@ /* debug level; defined in include/ceph_fs.h */ -int ceph_debug = 20; +int ceph_debug = 200; /* * directory of filesystems mounted by this host @@ -66,7 +66,7 @@ static struct ceph_client *create_client(struct ceph_mount_args *args) get_client_counter(); /* messenger */ - cl->msgr = ceph_messenger_create(); + cl->msgr = ceph_messenger_create(&args->my_addr); if (IS_ERR(cl->msgr)) { err = PTR_ERR(cl->msgr); goto fail; @@ -236,7 +236,9 @@ void ceph_put_client(struct ceph_client *cl) */ void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg) { - dout(5, "dispatch %p type %d\n", (void*)msg, msg->hdr.type); + dout(5, "dispatch from %s%d type %d len %d+%d\n", + ceph_name_type_str(msg->hdr.src.name.type), msg->hdr.src.name.num, + msg->hdr.type, msg->hdr.front_len, msg->hdr.data_len); /* deliver the message */ switch (msg->hdr.type) { diff --git a/trunk/ceph/kernel/ktcp.c b/trunk/ceph/kernel/ktcp.c index 4dd6c3235ae0f..c5c9189eb62fc 100644 --- a/trunk/ceph/kernel/ktcp.c +++ b/trunk/ceph/kernel/ktcp.c @@ -17,13 +17,14 @@ static void ceph_data_ready(struct sock *sk, int count_unused) struct ceph_connection *con; struct ceph_messenger *msgr; - printk(KERN_INFO "Entered ceph_data_ready \n"); - if (sk->sk_state == TCP_LISTEN) { msgr = (struct ceph_messenger *)sk->sk_user_data; + dout(30, "ceph_data_ready listener %p\n", msgr); queue_work(recv_wq, &msgr->awork); } else { con = (struct ceph_connection *)sk->sk_user_data; + dout(30, "ceph_data_ready connection %p state = %u, queuing rwork\n", + con, con->state); queue_work(recv_wq, &con->rwork); } } @@ -33,9 +34,9 @@ static void ceph_write_space(struct sock *sk) { struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; - printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state); - if (test_bit(WRITE_PEND, &con->state)) { - printk(KERN_INFO "WRITE_PEND set in connection\n"); + dout(30, "ceph_write_space %p state = %u\n", con, con->state); + if (test_bit(WRITE_PENDING, &con->state)) { + dout(30, "ceph_write_space %p queueing write work\n", con); queue_work(send_wq, &con->swork); } } @@ -44,13 +45,14 @@ static void ceph_write_space(struct sock *sk) static void ceph_state_change(struct sock *sk) { struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; - printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state); + dout(30, "ceph_state_change %p state = %u\n", con, con->state); if (sk->sk_state == TCP_ESTABLISHED) { - if (test_and_clear_bit(CONNECTING, &con->state) || - test_bit(ACCEPTING, &con->state)) - set_bit(OPEN, &con->state); - ceph_write_space(sk); + /*if (test_bit(CONNECTING, &con->state) || + test_bit(ACCEPTING, &con->state)) {*/ + dout(30, "ceph_state_change %p socket established, queuing swork\n", con); + queue_work(send_wq, &con->swork); + /*}*/ } } @@ -76,20 +78,18 @@ int ceph_tcp_connect(struct ceph_connection *con) int ret; struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; - set_bit(CONNECTING, &con->state); - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); if (ret < 0) { derr(1, "ceph_tcp_connect sock_create_kern error: %d\n", ret); goto done; } - /* setup callbacks */ set_sock_callbacks(con->sock, (void *)con); - - ret = con->sock->ops->connect(con->sock, paddr, + + ret = con->sock->ops->connect(con->sock, paddr, sizeof(struct sockaddr_in), O_NONBLOCK); - if (ret == -EINPROGRESS) return 0; + if (ret == -EINPROGRESS) + return 0; if (ret < 0) { /* TBD check for fatal errors, retry if not fatal.. */ derr(1, "ceph_tcp_connect kernel_connect error: %d\n", ret); @@ -144,9 +144,7 @@ int ceph_tcp_listen(struct ceph_messenger *msgr) derr(0, "failed to getsockname: %d\n", ret); goto err; } - dout(0, "ceph_tcp_listen on %x:%d\n", - ntohl(myaddr->sin_addr.s_addr), - ntohs(myaddr->sin_port)); + dout(0, "ceph_tcp_listen on port %d\n", ntohs(myaddr->sin_port)); ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, sizeof(optval)); @@ -208,7 +206,6 @@ int ceph_tcp_accept(struct socket *sock, struct ceph_connection *con) goto err; } - set_bit(ACCEPTING, &con->state); done: return ret; err: @@ -225,18 +222,12 @@ int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) struct msghdr msg = {.msg_flags = 0}; int rlen = 0; /* length read */ - printk(KERN_INFO "entered krevmsg\n"); + dout(30, "ceph_tcp_recvmsg %p len %d\n", sock, (int)len); msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; - /* receive one kvec for now... */ rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); - if (rlen < 0) { - printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen); - } - /* TBD: kernel_recvmsg doesn't fill in the name and namelen - */ + dout(30, "ceph_tcp_recvmsg %p len %d ret = %d\n", sock, (int)len, rlen); return(rlen); - } /* @@ -247,13 +238,10 @@ int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t struct msghdr msg = {.msg_flags = 0}; int rlen = 0; - printk(KERN_INFO "entered ksendmsg\n"); + dout(30, "ceph_tcp_sendmsg %p len %d\n", sock, (int)len); msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; - rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len); - if (rlen < 0) { - printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen); - } + dout(30, "ceph_tcp_sendmsg %p len %d ret = %d\n", sock, (int)len, rlen); return(rlen); } diff --git a/trunk/ceph/kernel/mds_client.c b/trunk/ceph/kernel/mds_client.c index 4b8c0c4cd66af..7cb981d62dbb2 100644 --- a/trunk/ceph/kernel/mds_client.c +++ b/trunk/ceph/kernel/mds_client.c @@ -221,12 +221,14 @@ static void wait_for_new_map(struct ceph_mds_client *mdsc) void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) { + spin_lock_init(&mdsc->lock); mdsc->client = client; mdsc->mdsmap = 0; /* none yet */ mdsc->sessions = 0; mdsc->max_sessions = 0; mdsc->last_tid = 0; INIT_RADIX_TREE(&mdsc->request_tree, GFP_KERNEL); + mdsc->last_requested_map = 0; init_completion(&mdsc->map_waiters); } diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 6ef6d4a4523ab..d7cfe5f3fde57 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -64,7 +64,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); - spin_lock_init(&con->con_lock); + spin_lock_init(&con->lock); set_bit(NEW, &con->state); INIT_WORK(&con->rwork, try_read); /* setup work structure */ INIT_WORK(&con->swork, try_write); /* setup work structure */ @@ -178,7 +178,7 @@ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connect dout(20, "__remove_connection %p from %p\n", con, msgr); list_del(&con->list_all); - if (test_bit(CONNECTING, &con->state) || + if (test_bit(CONNECTING, &con->state) || test_bit(OPEN, &con->state)) { /* remove from con_open too */ key = hash_addr(&con->peer_addr); @@ -231,10 +231,11 @@ static int write_partial_kvec(struct ceph_connection *con) { int ret; + dout(30, "write_partial_kvec %p left %d vec %d bytes\n", con, + con->out_kvec_left, con->out_kvec_bytes); while (con->out_kvec_bytes > 0) { ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes); - if (ret < 0) return ret; /* error */ - if (ret == 0) return 0; /* socket full */ + if (ret <= 0) goto out; con->out_kvec_bytes -= ret; if (con->out_kvec_bytes == 0) break; /* done */ @@ -251,7 +252,11 @@ static int write_partial_kvec(struct ceph_connection *con) } } con->out_kvec_left = 0; - return 1; /* done! */ + ret = 1; +out: + dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con, + con->out_kvec_left, con->out_kvec_bytes, ret); + return ret; /* done! */ } static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg) @@ -289,12 +294,15 @@ static void prepare_write_message(struct ceph_connection *con) /* move to sending/sent list */ list_del(&m->list_head); - list_add(&m->list_head, &con->out_sent); + list_add_tail(&m->list_head, &con->out_sent); con->out_msg = m; /* encode header */ ceph_encode_header(&con->out_hdr, &m->hdr); + dout(20, "prepare_write_message %p seq %d type %d len %d+%d\n", + m, m->hdr.seq, m->hdr.type, m->hdr.front_len, m->hdr.data_len); + /* tag + hdr + front */ con->out_kvec[0].iov_base = &tag_msg; con->out_kvec[0].iov_len = 1; @@ -309,6 +317,8 @@ static void prepare_write_message(struct ceph_connection *con) con->out_msg_pos.page = 0; con->out_msg_pos.page_pos = m->hdr.data_off & PAGE_MASK; con->out_msg_pos.data_pos = 0; + + set_bit(WRITE_PENDING, &con->state); } /* @@ -325,6 +335,19 @@ static void prepare_write_ack(struct ceph_connection *con) con->out_kvec_left = 2; con->out_kvec_bytes = 1 + sizeof(con->in_seq_acked); con->out_kvec_cur = con->out_kvec; + set_bit(WRITE_PENDING, &con->state); +} + +static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con) +{ + con->out_kvec[0].iov_base = &msgr->inst.addr; + con->out_kvec[0].iov_len = sizeof(msgr->inst.addr); + con->out_kvec[1].iov_base = &con->connect_seq; + con->out_kvec[1].iov_len = sizeof(con->connect_seq); + con->out_kvec_left = 2; + con->out_kvec_bytes = sizeof(msgr->inst.addr) + sizeof(con->connect_seq); + con->out_kvec_cur = con->out_kvec; + set_bit(WRITE_PENDING, &con->state); } static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con) @@ -334,6 +357,7 @@ static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ce con->out_kvec_left = 1; con->out_kvec_bytes = sizeof(msgr->inst.addr); con->out_kvec_cur = con->out_kvec; + set_bit(WRITE_PENDING, &con->state); } static void prepare_write_accept_ready(struct ceph_connection *con) @@ -343,7 +367,9 @@ static void prepare_write_accept_ready(struct ceph_connection *con) con->out_kvec_left = 1; con->out_kvec_bytes = 1; con->out_kvec_cur = con->out_kvec; + set_bit(WRITE_PENDING, &con->state); } + static void prepare_write_accept_reject(struct ceph_connection *con) { con->out_kvec[0].iov_base = &tag_reject; @@ -353,6 +379,7 @@ static void prepare_write_accept_reject(struct ceph_connection *con) con->out_kvec_left = 2; con->out_kvec_bytes = 1 + sizeof(con->connect_seq); con->out_kvec_cur = con->out_kvec; + set_bit(WRITE_PENDING, &con->state); } /* @@ -371,18 +398,15 @@ more: /* kvec data queued? */ if (con->out_kvec_left) { ret = write_partial_kvec(con); - if (ret == 0) - goto done; - - if (test_bit(REJECTING, &con->state)) { + if (test_and_clear_bit(REJECTING, &con->state)) { + dout(30, "try_write done rejecting, state %u, closing\n", con->state); /* FIXME do something else here, pbly? */ remove_connection(msgr, con); set_bit(CLOSED, &con->state); put_connection(con); } - - /* TBD: handle error; return for now */ - if (ret < 0) { + if (ret <= 0) { + /* TBD: handle error; return for now */ con->error = ret; goto done; /* error */ } @@ -406,8 +430,9 @@ more: } /* hmm, nothing to do! No more writes pending? */ - if (ret) - clear_bit(WRITE_PEND, &con->state); + dout(30, "try_write nothing else to write\n"); + clear_bit(WRITE_PENDING, &con->state); + done: return; } @@ -535,8 +560,75 @@ static void process_ack(struct ceph_connection *con, __u32 ack) } +/* + * read portion of connect-side handshake on a new connection + */ +static int read_connect_partial(struct ceph_connection *con) +{ + int ret, to; + dout(20, "read_connect_partial %p start at %d\n", con, con->in_base_pos); + + /* actual_peer_addr */ + to = sizeof(con->actual_peer_addr); + while (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = con->in_base_pos; + ret = ceph_tcp_recvmsg(con->sock, (char*)&con->actual_peer_addr + have, left); + if (ret <= 0) goto out; + con->in_base_pos += ret; + } + + /* in_tag */ + to = sizeof(con->actual_peer_addr) + 1; + if (con->in_base_pos < to) { + ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); + if (ret <= 0) goto out; + con->in_base_pos += ret; + } + + /* peer_connect_seq */ + to += sizeof(con->peer_connect_seq); + if (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = sizeof(con->peer_connect_seq) - left; + ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_connect_seq + have, left); + if (ret <= 0) goto out; + con->in_base_pos += ret; + } + ret = 1; +out: + dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret); + return ret; /* done */ +} + +static void process_connect(struct ceph_connection *con) +{ + dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag); + clear_bit(CONNECTING, &con->state); + if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) { + derr(1, "process_connect wrong peer, want %x:%d/%d, got %x:%d/%d, wtf\n", + ntohl(con->peer_addr.ipaddr.sin_addr.s_addr), + ntohs(con->peer_addr.ipaddr.sin_port), + con->peer_addr.nonce, + ntohl(con->actual_peer_addr.ipaddr.sin_addr.s_addr), + ntohs(con->actual_peer_addr.ipaddr.sin_port), + con->actual_peer_addr.nonce); + con->in_tag = CEPH_MSGR_TAG_REJECT; + } + if (con->in_tag == CEPH_MSGR_TAG_REJECT) { + dout(10, "process_connect got REJECT peer seq %u\n", con->peer_connect_seq); + set_bit(CLOSED, &con->state); + } + if (con->in_tag == CEPH_MSGR_TAG_READY) { + dout(10, "process_connect got READY, now open\n"); + set_bit(OPEN, &con->state); + } +} + + + /* - * read portion of handshake on a newly accepted connection + * read portion of accept-side handshake on a newly accepted connection */ static int read_accept_partial(struct ceph_connection *con) { @@ -572,7 +664,7 @@ static void process_accept(struct ceph_connection *con) spin_lock(&con->msgr->con_lock); existing = get_connection(con->msgr, &con->peer_addr); if (existing) { - spin_lock(&existing->con_lock); + spin_lock(&existing->lock); if ((test_bit(CONNECTING, &existing->state) && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) || (test_bit(OPEN, &existing->state) && @@ -584,20 +676,22 @@ static void process_accept(struct ceph_connection *con) con->out_seq = existing->out_seq; set_bit(OPEN, &con->state); set_bit(CLOSED, &existing->state); + clear_bit(OPEN, &existing->state); } else { /* reject new connection */ set_bit(REJECTING, &con->state); con->connect_seq = existing->connect_seq; /* send this with the reject */ } - spin_unlock(&existing->con_lock); + spin_unlock(&existing->lock); put_connection(existing); } else { add_connection(con->msgr, con); - set_bit(OPEN, &con->state); + con->state = OPEN; } spin_unlock(&con->msgr->con_lock); /* the result? */ + clear_bit(ACCEPTING, &con->state); if (test_bit(REJECTING, &con->state)) prepare_write_accept_reject(con); else @@ -615,6 +709,7 @@ static void try_read(struct work_struct *work) struct ceph_messenger *msgr; con = container_of(work, struct ceph_connection, rwork); + spin_lock(&con->lock); msgr = con->msgr; more: @@ -624,12 +719,20 @@ more: if (test_bit(CLOSED, &con->state)) goto done; if (test_bit(ACCEPTING, &con->state)) { + dout(20, "try_read accepting\n"); ret = read_accept_partial(con); if (ret <= 0) goto done; /* accepted */ process_accept(con); goto more; } + if (test_bit(CONNECTING, &con->state)) { + dout(20, "try_read connecting\n"); + ret = read_connect_partial(con); + if (ret <= 0) goto done; + process_connect(con); + if (test_bit(CLOSED, &con->state)) goto done; + } if (con->in_tag == CEPH_MSGR_TAG_READY) { ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); @@ -639,7 +742,8 @@ more: else if (con->in_tag == CEPH_MSGR_TAG_ACK) prepare_read_ack(con); else { - printk(KERN_INFO "bad tag %d\n", (int)con->in_tag); + derr(2, "try_read got bad tag %d\n", (int)con->in_tag); + ret = -EINVAL; goto bad; } goto more; @@ -662,10 +766,12 @@ more: con->in_tag = CEPH_MSGR_TAG_READY; goto more; } + derr(2, "try_read bad con->in_tag = %d\n", (int)con->in_tag); bad: BUG_ON(1); /* shouldn't get here */ done: con->error = ret; + spin_unlock(&con->lock); return; } @@ -688,21 +794,18 @@ static void try_accept(struct work_struct *work) derr(1, "malloc failure\n"); goto done; } - - if(ceph_tcp_accept(msgr->listen_sock, new_con) < 0) { + if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) { derr(1, "error accepting connection\n"); - kfree(new_con); + put_connection(new_con); goto done; } dout(5, "accepted connection \n"); new_con->in_tag = CEPH_MSGR_TAG_READY; - + set_bit(ACCEPTING, &new_con->state); prepare_write_accept_announce(msgr, new_con); - add_connection_accepting(msgr, new_con); - set_bit(WRITE_PEND, &new_con->state); /* * hand off to worker threads ,should be able to write, we want to * try to write right away, we may have missed socket state change @@ -715,7 +818,7 @@ done: /* * create a new messenger instance, creates listening socket */ -struct ceph_messenger *ceph_messenger_create() +struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) { struct ceph_messenger *msgr; int ret = 0; @@ -735,7 +838,8 @@ struct ceph_messenger *ceph_messenger_create() kfree(msgr); return ERR_PTR(ret); } - + if (myaddr) + msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr; dout(1, "ceph_messenger_create %p listening on %x:%d\n", msgr, ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), @@ -794,13 +898,15 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); } - spin_lock(&con->con_lock); + spin_lock(&con->lock); /* initiate connect? */ - if (test_bit(NEW, &con->state)) { - dout(5, "ceph_msg_send initiating connect on %p\n", con); + dout(5, "ceph_msg_send connection %p state is %u\n", con, con->state); + if (test_and_clear_bit(NEW, &con->state)) { + set_bit(CONNECTING, &con->state); + dout(5, "ceph_msg_send initiating connect on %p new state %u\n", con, con->state); ret = ceph_tcp_connect(con); - if (ret < 0){ + if (ret < 0) { derr(1, "connection failure to peer %x:%d\n", ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); @@ -808,16 +914,17 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) put_connection(con); return(ret); } + prepare_write_connect(msgr, con); } /* queue */ - dout(1, "ceph_msg_send queuing outgoing message for %s%d on %p\n", + msg->hdr.seq = ++con->out_seq; + dout(1, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq, ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con); ceph_msg_get(msg); + list_add_tail(&msg->list_head, &con->out_queue); - list_add(&msg->list_head, &con->out_queue); - set_bit(WRITE_PEND, &con->state); - spin_unlock(&con->con_lock); + spin_unlock(&con->lock); put_connection(con); return ret; } @@ -829,7 +936,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of struct ceph_msg *m; int i; - m = kzalloc(sizeof(*m), GFP_KERNEL); + m = kmalloc(sizeof(*m), GFP_KERNEL); if (m == NULL) goto out; atomic_set(&m->nref, 1); @@ -853,7 +960,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of if (m->pages[i] == NULL) goto out2; } + } else { + m->pages = 0; } + + INIT_LIST_HEAD(&m->list_head); return m; out2: @@ -864,8 +975,9 @@ out: void ceph_msg_put(struct ceph_msg *m) { + int i; if (atomic_dec_and_test(&m->nref)) { - int i; + dout(30, "ceph_msg_put last one on %p\n", m); if (m->pages) { for (i=0; inr_pages; i++) if (m->pages[i]) diff --git a/trunk/ceph/kernel/messenger.h b/trunk/ceph/kernel/messenger.h index 71c35c015ce10..5fa7c13b897eb 100644 --- a/trunk/ceph/kernel/messenger.h +++ b/trunk/ceph/kernel/messenger.h @@ -52,13 +52,13 @@ struct ceph_msg_pos { /* current state of connection */ -#define NEW 1 -#define CONNECTING 2 -#define ACCEPTING 3 -#define OPEN 4 -#define WRITE_PEND 5 -#define REJECTING 6 -#define CLOSED 7 +#define NEW 1 +#define CONNECTING 2 +#define ACCEPTING 3 +#define OPEN 4 +#define WRITE_PENDING 5 +#define REJECTING 6 +#define CLOSED 7 struct ceph_connection { struct ceph_messenger *msgr; @@ -66,7 +66,7 @@ struct ceph_connection { __u32 state; /* connection state */ atomic_t nref; - spinlock_t con_lock; /* connection lock */ + spinlock_t lock; /* connection lock */ struct list_head list_all; /* msgr->con_all */ struct list_head list_bucket; /* msgr->con_open or con_accepting */ @@ -76,6 +76,10 @@ struct ceph_connection { __u32 out_seq; /* last message queued for send */ __u32 in_seq, in_seq_acked; /* last message received, acked */ + /* connect state */ + struct ceph_entity_addr actual_peer_addr; + __u32 peer_connect_seq; + /* out queue */ struct list_head out_queue; struct ceph_msg_header out_hdr; @@ -103,7 +107,7 @@ struct ceph_connection { }; -extern struct ceph_messenger *ceph_messenger_create(void); +extern struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr); extern void ceph_messenger_destroy(struct ceph_messenger *); extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off); @@ -118,22 +122,22 @@ extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg); static __inline__ int ceph_decode_64(void **p, void *end, __u64 *v) { if (unlikely(*p + sizeof(*v) > end)) return -EINVAL; - *v = le64_to_cpu(*(__u64*)p); - p += sizeof(*v); + *v = le64_to_cpu(*(__u64*)*p); + *p += sizeof(*v); return 0; } static __inline__ int ceph_decode_32(void **p, void *end, __u32 *v) { if (unlikely(*p + sizeof(*v) > end)) return -EINVAL; - *v = le32_to_cpu(*(__u32*)p); - p += sizeof(*v); + *v = le32_to_cpu(*(__u32*)*p); + *p += sizeof(*v); return 0; } static __inline__ int ceph_decode_16(void **p, void *end, __u16 *v) { if (unlikely(*p + sizeof(*v) > end)) return -EINVAL; - *v = le16_to_cpu(*(__u16*)p); - p += sizeof(*v); + *v = le16_to_cpu(*(__u16*)*p); + *p += sizeof(*v); return 0; } static __inline__ int ceph_decode_copy(void **p, void *end, void *v, int len) { @@ -159,10 +163,10 @@ static __inline__ int ceph_decode_addr(void **p, void *end, struct ceph_entity_a static __inline__ int ceph_decode_name(void **p, void *end, struct ceph_entity_name *v) { if (unlikely(*p + sizeof(*v) > end)) return -EINVAL; - v->type = le32_to_cpu(*(__u32*)p); - p += sizeof(__u32); - v->num = le32_to_cpu(*(__u32*)p); - p += sizeof(__u32); + v->type = le32_to_cpu(*(__u32*)*p); + *p += sizeof(__u32); + v->num = le32_to_cpu(*(__u32*)*p); + *p += sizeof(__u32); return 0; } @@ -207,15 +211,15 @@ static __inline__ void ceph_decode_header(struct ceph_msg_header *to) static __inline__ int ceph_encode_64(void **p, void *end, __u64 v) { BUG_ON(*p + sizeof(v) > end); - *(__u64*)p = cpu_to_le64(v); - p += sizeof(v); + *(__u64*)*p = cpu_to_le64(v); + *p += sizeof(v); return 0; } static __inline__ int ceph_encode_32(void **p, void *end, __u32 v) { BUG_ON(*p + sizeof(v) > end); - *(__u32*)p = cpu_to_le64(v); - p += sizeof(v); + *(__u32*)*p = cpu_to_le64(v); + *p += sizeof(v); return 0; } diff --git a/trunk/ceph/kernel/osd_client.c b/trunk/ceph/kernel/osd_client.c index 35e8a51bdc6e8..b3d8d22d29c50 100644 --- a/trunk/ceph/kernel/osd_client.c +++ b/trunk/ceph/kernel/osd_client.c @@ -312,6 +312,10 @@ void ceph_osdc_init(struct ceph_osd_client *osdc) { dout(5, "ceph_osdc_init\n"); osdc->osdmap = NULL; + osdc->last_tid = 0; + INIT_RADIX_TREE(&osdc->request_tree, GFP_KERNEL); + osdc->last_requested_map = 0; + init_completion(&osdc->map_waiters); } diff --git a/trunk/ceph/kernel/super.c b/trunk/ceph/kernel/super.c index f68aa4ff2c057..ccfa7a3d11e6e 100644 --- a/trunk/ceph/kernel/super.c +++ b/trunk/ceph/kernel/super.c @@ -184,14 +184,16 @@ enum { Opt_fsidmajor, Opt_fsidminor, Opt_debug, - Opt_monport + Opt_monport, + Opt_ip }; static match_table_t arg_tokens = { {Opt_fsidmajor, "fsidmajor=%ld"}, {Opt_fsidminor, "fsidminor=%ld"}, {Opt_debug, "debug=%d"}, - {Opt_monport, "monport=%d"} + {Opt_monport, "monport=%d"}, + {Opt_ip, "ip=%s"} }; /* @@ -207,18 +209,20 @@ static int parse_ip(const char *c, int len, struct ceph_entity_addr *addr) dout(15, "parse_ip on '%s' len %d\n", c, len); for (i=0; *p && i<4; i++) { v = 0; + //dout(30, " i=%d at %s\n", i, p); while (*p && *p != '.' && p < c+len) { if (*p < '0' || *p > '9') goto bad; v = (v * 10) + (*p - '0'); p++; } + //dout(30, " v = %d\n", v); ip = (ip << 8) + v; if (!*p) break; p++; } - if (i < 4) + if (p < c+len) goto bad; *(__u32*)&addr->ipaddr.sin_addr.s_addr = htonl(ip); @@ -269,16 +273,17 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru /* parse mount options */ while ((c = strsep(&options, ",")) != NULL) { - int token; - int intval; - int ret; + int token, intval, ret, i; if (!*c) continue; token = match_token(c, arg_tokens, argstr); - ret = match_int(&argstr[0], &intval); - if (ret < 0) { - dout(0, "bad mount arg, not int\n"); - continue; + if (token < Opt_ip) { + ret = match_int(&argstr[0], &intval); + if (ret < 0) { + dout(0, "bad mount arg, not int\n"); + continue; + } + dout(30, "got token %d intval %d\n", token, intval); } switch (token) { case Opt_fsidmajor: @@ -288,17 +293,23 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru args->fsid.minor = intval; break; case Opt_monport: + dout(25, "parse_mount_args monport=%d\n", intval); args->mon_port = intval; + for (i=0; inum_mon; i++) + args->mon_addr[i].ipaddr.sin_port = htons(intval); break; case Opt_debug: ceph_debug = intval; break; + case Opt_ip: + parse_ip(argstr[0].from, argstr[0].to-argstr[0].from, &args->my_addr); + break; default: - derr(1, "bad mount option %s\n", c); + derr(1, "parse_mount_args bad token %d\n", token); continue; } } - + return 0; } diff --git a/trunk/ceph/kernel/super.h b/trunk/ceph/kernel/super.h index 3055e978c2513..4da42ac81aa06 100644 --- a/trunk/ceph/kernel/super.h +++ b/trunk/ceph/kernel/super.h @@ -16,6 +16,7 @@ struct ceph_mount_args { int mntflags; int flags; struct ceph_fsid fsid; + struct ceph_entity_addr my_addr; int num_mon; struct ceph_entity_addr mon_addr[5]; int mon_port; -- 2.39.5