From 67810931d0086b190bb3c4c4885e70c715bca8f2 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 26 May 2008 09:15:09 -0700 Subject: [PATCH] kclient: wrap socket in refcounting kobject --- src/TODO | 2 + src/kernel/ktcp.c | 198 ++++++++++++++++++++++++++--------------- src/kernel/ktcp.h | 25 ++++-- src/kernel/messenger.c | 99 ++++++++++++--------- src/kernel/messenger.h | 6 +- 5 files changed, 208 insertions(+), 122 deletions(-) diff --git a/src/TODO b/src/TODO index f56542fdbe243..bd65fd540eb12 100644 --- a/src/TODO +++ b/src/TODO @@ -38,6 +38,8 @@ kernel client - should we be using debugfs? - a dir for each client instance (client###)? - hooks to get mds, osd, monmap epoch #s +- clean up messenger vs ktcp + - hook into sysfs? - vfs - can we use dentry_path(), if it gets merged into mainline? - io / osd client diff --git a/src/kernel/ktcp.c b/src/kernel/ktcp.c index cf80c8e56893b..e29ee0dbdfb6d 100644 --- a/src/kernel/ktcp.c +++ b/src/kernel/ktcp.c @@ -14,6 +14,67 @@ struct workqueue_struct *recv_wq; /* receive work queue */ struct workqueue_struct *send_wq; /* send work queue */ struct workqueue_struct *accept_wq; /* accept work queue */ +struct kobject *ceph_sockets_kobj; + +/* + * sockets + */ +void ceph_socket_destroy(struct kobject *kobj) +{ + struct ceph_socket *s = container_of(kobj, struct ceph_socket, kobj); + dout(10, "socket_destroy %p\n", s); + sock_release(s->sock); + kfree(s); +} + +struct kobj_type ceph_socket_type = { + .release = ceph_socket_destroy, +}; + +struct ceph_socket *ceph_socket_create() +{ + struct ceph_socket *s; + int err; + + s = kzalloc(sizeof(*s), GFP_NOFS); + if (!s) { + derr(10, "ENOMEM creating ceph_socket\n"); + return ERR_PTR(-ENOMEM); + } + + err = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &s->sock); + if (err) { + derr(10, "sock_create_kern error %d\n", err); + return ERR_PTR(err); + } + + kobject_init_and_add(&s->kobj, &ceph_socket_type, + ceph_sockets_kobj, + "socket %p", s); + return s; +} + +void ceph_socket_get(struct ceph_socket *s) +{ + struct kobject *r; + + dout(10, "socket_get %p\n", s); + r = kobject_get(&s->kobj); + BUG_ON(!r); +} + +void ceph_socket_put(struct ceph_socket *s, int die) +{ + dout(10, "socket_put %p\n", s); + if (!s) + return; + if (die) + ceph_cancel_sock_callbacks(s); + kobject_put(&s->kobj); +} + + + /* * socket callback functions */ @@ -89,77 +150,74 @@ static void ceph_state_change(struct sock *sk) } /* make a listening socket active by setting up the data ready call back */ -static void listen_sock_callbacks(struct socket *sock, +static void listen_sock_callbacks(struct ceph_socket *s, struct ceph_messenger *msgr) { - struct sock *sk = sock->sk; + struct sock *sk = s->sock->sk; sk->sk_user_data = (void *)msgr; sk->sk_data_ready = ceph_accept_ready; } /* make a socket active by setting up the call back functions */ -static void set_sock_callbacks(struct socket *sock, struct ceph_connection *con) +static void set_sock_callbacks(struct ceph_socket *s, + struct ceph_connection *con) { - struct sock *sk = sock->sk; + struct sock *sk = s->sock->sk; sk->sk_user_data = (void *)con; sk->sk_data_ready = ceph_data_ready; sk->sk_write_space = ceph_write_space; sk->sk_state_change = ceph_state_change; } -void ceph_cancel_sock_callbacks(struct socket *sock) +void ceph_cancel_sock_callbacks(struct ceph_socket *s) { struct sock *sk; - if (!sock) + if (!s) return; - sk = sock->sk; + sk = s->sock->sk; sk->sk_user_data = 0; sk->sk_data_ready = 0; sk->sk_write_space = 0; sk->sk_state_change = 0; } -void ceph_sock_release(struct socket *sock) -{ - if (!sock) - return; - ceph_cancel_sock_callbacks(sock); - sock_release(sock); -} /* * initiate connection to a remote socket. */ -int ceph_tcp_connect(struct ceph_connection *con) +struct ceph_socket *ceph_tcp_connect(struct ceph_connection *con) { int ret; struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; + struct ceph_socket *s; - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); - if (ret < 0) { - derr(1, "connect sock_create_kern error: %d\n", ret); - goto done; - } - - con->sock->sk->sk_allocation = GFP_NOFS; + s = ceph_socket_create(); + if (IS_ERR(s)) + return s; + + con->s = s; + s->sock->sk->sk_allocation = GFP_NOFS; - set_sock_callbacks(con->sock, con); + set_sock_callbacks(s, con); - ret = con->sock->ops->connect(con->sock, paddr, - sizeof(struct sockaddr_in), O_NONBLOCK); + ret = s->sock->ops->connect(s->sock, paddr, + sizeof(struct sockaddr_in), O_NONBLOCK); if (ret == -EINPROGRESS) { dout(20, "connect EINPROGRESS sk_state = = %u\n", - con->sock->sk->sk_state); - return 0; + s->sock->sk->sk_state); + ret = 0; } if (ret < 0) { /* TBD check for fatal errors, retry if not fatal.. */ derr(1, "connect %u.%u.%u.%u:%u error: %d\n", IPQUADPORT(*(struct sockaddr_in *)paddr), ret); - sock_release(con->sock); - con->sock = NULL; + ceph_socket_put(s, 1); + con->s = 0; } -done: - return ret; + + if (ret < 0) + return ERR_PTR(ret); + ceph_socket_get(s); /* for caller */ + return s; } /* @@ -168,27 +226,26 @@ done: int ceph_tcp_listen(struct ceph_messenger *msgr) { int ret; - struct socket *sock = NULL; int optval = 1; struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr; int nlen; + struct ceph_socket *s; - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); - if (ret < 0) { - derr(0, "sock_create_kern error: %d\n", ret); - return ret; - } + s = ceph_socket_create(); + if (IS_ERR(s)) + return PTR_ERR(s); - sock->sk->sk_allocation = GFP_NOFS; + s->sock->sk->sk_allocation = GFP_NOFS; - ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + ret = kernel_setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval)); if (ret < 0) { derr(0, "Failed to set SO_REUSEADDR: %d\n", ret); goto err; } - ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, sizeof(*myaddr)); + ret = s->sock->ops->bind(s->sock, (struct sockaddr *)myaddr, + sizeof(*myaddr)); if (ret < 0) { derr(0, "Failed to bind: %d\n", ret); goto err; @@ -196,93 +253,90 @@ int ceph_tcp_listen(struct ceph_messenger *msgr) /* what port did we bind to? */ nlen = sizeof(*myaddr); - ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen, 0); + ret = s->sock->ops->getname(s->sock, (struct sockaddr *)myaddr, &nlen, + 0); if (ret < 0) { derr(0, "failed to getsockname: %d\n", ret); goto err; } dout(10, "listen on port %d\n", ntohs(myaddr->sin_port)); - ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + ret = kernel_setsockopt(s->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, sizeof(optval)); if (ret < 0) { derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret); goto err; } - msgr->listen_sock = sock; - /* TBD: probaby want to tune the backlog queue .. */ - ret = sock->ops->listen(sock, NUM_BACKUP); + ret = s->sock->ops->listen(s->sock, NUM_BACKUP); if (ret < 0) { derr(0, "kernel_listen error: %d\n", ret); - msgr->listen_sock = NULL; goto err; } - /* setup callbacks */ - listen_sock_callbacks(msgr->listen_sock, msgr); - + /* ok! */ + msgr->listen_s = s; + listen_sock_callbacks(s, msgr); return ret; + err: - sock_release(sock); + ceph_socket_put(s, 0); return ret; } /* * accept a connection */ -int ceph_tcp_accept(struct socket *sock, struct ceph_connection *con) +int ceph_tcp_accept(struct ceph_socket *ls, struct ceph_connection *con) { int ret; struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; int len; + struct ceph_socket *s; + + s = ceph_socket_create(); + if (IS_ERR(s)) + return PTR_ERR(s); + con->s = s; + s->sock->sk->sk_allocation = GFP_NOFS; - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); - if (ret < 0) { - derr(0, "sock_create_kern error: %d\n", ret); - goto done; - } - - con->sock->sk->sk_allocation = GFP_NOFS; - - ret = sock->ops->accept(sock, con->sock, O_NONBLOCK); - /* ret = kernel_accept(sock, &new_sock, sock->file->f_flags); */ + ret = ls->sock->ops->accept(ls->sock, s->sock, O_NONBLOCK); if (ret < 0) { derr(0, "accept error: %d\n", ret); goto err; } /* setup callbacks */ - set_sock_callbacks(con->sock, con); + set_sock_callbacks(s, con); - con->sock->ops = sock->ops; - con->sock->type = sock->type; - ret = con->sock->ops->getname(con->sock, paddr, &len, 2); + s->sock->ops = ls->sock->ops; + s->sock->type = ls->sock->type; + ret = s->sock->ops->getname(s->sock, paddr, &len, 2); if (ret < 0) { derr(0, "getname error: %d\n", ret); goto err; } - -done: return ret; + err: - sock_release(con->sock); + ceph_socket_put(s, 0); + con->s = 0; return ret; } /* * receive a message this may return after partial send */ -int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) +int ceph_tcp_recvmsg(struct ceph_socket *s, void *buf, size_t len) { struct kvec iov = {buf, len}; struct msghdr msg = {.msg_flags = 0}; int rlen = 0; /* length read */ msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; - rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); + rlen = kernel_recvmsg(s->sock, &msg, &iov, 1, len, msg.msg_flags); return(rlen); } @@ -290,7 +344,7 @@ int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) /* * Send a message this may return after partial send */ -int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, +int ceph_tcp_sendmsg(struct ceph_socket *s, struct kvec *iov, size_t kvlen, size_t len, int more) { struct msghdr msg = {.msg_flags = 0}; @@ -303,7 +357,7 @@ int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/ - rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len); + rlen = kernel_sendmsg(s->sock, &msg, iov, kvlen, len); /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/ return(rlen); } diff --git a/src/kernel/ktcp.h b/src/kernel/ktcp.h index ff5115311c41a..840c8d5a8865b 100644 --- a/src/kernel/ktcp.h +++ b/src/kernel/ktcp.h @@ -4,19 +4,28 @@ extern struct workqueue_struct *recv_wq; /* receive work queue */ extern struct workqueue_struct *send_wq; /* send work queue */ +/* wrap socket, since we use/drop it from multiple threads */ +extern struct kobject *ceoh_sockets_kobj; + +struct ceph_socket { + struct kobject kobj; + struct socket *sock; +}; + /* prototype definitions */ -int ceph_tcp_connect(struct ceph_connection *); +struct ceph_socket *ceph_tcp_connect(struct ceph_connection *); int ceph_tcp_listen(struct ceph_messenger *); -int ceph_tcp_accept(struct socket *, struct ceph_connection *); -int ceph_tcp_recvmsg(struct socket *, void *, size_t ); -int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more); -void ceph_cancel_sock_callbacks(struct socket *); -void ceph_sock_release(struct socket *); +int ceph_tcp_accept(struct ceph_socket *, struct ceph_connection *); +int ceph_tcp_recvmsg(struct ceph_socket *, void *, size_t ); +int ceph_tcp_sendmsg(struct ceph_socket *, struct kvec *, size_t, size_t, int more); +void ceph_cancel_sock_callbacks(struct ceph_socket *); int ceph_workqueue_init(void); void ceph_workqueue_shutdown(void); -/* Well known port for ceph client listener.. */ -#define CEPH_PORT 2002 +extern struct ceph_socket *ceph_socket_create(void); +extern void ceph_socket_get(struct ceph_socket *s); +extern void ceph_socket_put(struct ceph_socket *s, int die); + /* Max number of outstanding connections in listener queueu */ #define NUM_BACKUP 10 #endif diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 7c91e9c972d0a..bd32cd321b2f6 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -28,6 +28,8 @@ static void try_accept(struct work_struct *); + + /* * connections */ @@ -118,7 +120,7 @@ static void put_connection(struct ceph_connection *con) dout(20, "put_connection destroying %p\n", con); ceph_msg_put_list(&con->out_queue); ceph_msg_put_list(&con->out_sent); - ceph_sock_release(con->sock); + ceph_socket_put(con->s, 1); kfree(con); } } @@ -275,8 +277,8 @@ static void ceph_fault(struct ceph_connection *con) if (con->delay) { dout(30, "fault tcp_close delay != 0\n"); - ceph_sock_release(con->sock); - con->sock = NULL; + ceph_socket_put(con->s, 1); + con->s = NULL; set_bit(NEW, &con->state); /* @@ -328,13 +330,14 @@ static void ceph_fault(struct ceph_connection *con) * 0 -> socket full, but more to do * <0 -> error */ -static int write_partial_kvec(struct ceph_connection *con) +static int write_partial_kvec(struct ceph_connection *con, + struct ceph_socket *s) { int ret; dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes); while (con->out_kvec_bytes > 0) { - ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, + ret = ceph_tcp_sendmsg(s, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes, con->out_more); if (ret <= 0) @@ -364,6 +367,7 @@ out: } static int write_partial_msg_pages(struct ceph_connection *con, + struct ceph_socket *s, struct ceph_msg *msg) { struct kvec kv; @@ -389,7 +393,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, kv.iov_base = kaddr + con->out_msg_pos.page_pos; kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), (int)(data_len - con->out_msg_pos.data_pos)); - ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len, 1); + ret = ceph_tcp_sendmsg(s, &kv, 1, kv.iov_len, 1); if (msg->pages) kunmap(page); mutex_unlock(&msg->page_mutex); @@ -574,9 +578,12 @@ static void try_write(struct work_struct *work) container_of(work, struct ceph_connection, swork.work); struct ceph_messenger *msgr = con->msgr; int ret = 1; + struct ceph_socket *s = con->s; dout(30, "try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref)); + if (s) + ceph_socket_get(s); if (test_bit(CLOSED, &con->state)) { dout(5, "try_write closed\n"); @@ -600,8 +607,10 @@ more: con->in_tag = CEPH_MSGR_TAG_READY; dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state); - ret = ceph_tcp_connect(con); - if (ret < 0) { + BUG_ON(s); + s = ceph_tcp_connect(con); + dout(10, "tcp_connect returned %p\n", s); + if (IS_ERR(s)) { con->error_msg = "connect error"; ceph_fault(con); goto done; @@ -611,7 +620,7 @@ more: /* kvec data queued? */ more_kvec: if (con->out_kvec_left) { - ret = write_partial_kvec(con); + ret = write_partial_kvec(con, s); if (ret == 0) goto done; if (ret < 0) { @@ -622,7 +631,7 @@ more_kvec: /* msg pages? */ if (con->out_msg && con->out_msg->nr_pages) { - ret = write_partial_msg_pages(con, con->out_msg); + ret = write_partial_msg_pages(con, s, con->out_msg); if (ret == 1) goto more_kvec; if (ret == 0) @@ -655,6 +664,7 @@ more_kvec: done: dout(30, "try_write done on %p\n", con); + ceph_socket_put(s, 0); put_connection(con); return; } @@ -681,7 +691,8 @@ static int prepare_read_message(struct ceph_connection *con) /* * read (part of) a message */ -static int read_message_partial(struct ceph_connection *con) +static int read_message_partial(struct ceph_connection *con, + struct ceph_socket *s) { struct ceph_msg *m = con->in_msg; void *p; @@ -694,7 +705,7 @@ static int read_message_partial(struct ceph_connection *con) /* header */ while (con->in_base_pos < sizeof(m->hdr)) { left = sizeof(m->hdr) - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos, + ret = ceph_tcp_recvmsg(s, &m->hdr + con->in_base_pos, left); if (ret <= 0) return ret; @@ -710,7 +721,7 @@ static int read_message_partial(struct ceph_connection *con) return -ENOMEM; } left = front_len - m->front.iov_len; - ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + + ret = ceph_tcp_recvmsg(s, (char *)m->front.iov_base + m->front.iov_len, left); if (ret <= 0) return ret; @@ -757,7 +768,7 @@ static int read_message_partial(struct ceph_connection *con) return 0; } p = kmap(m->pages[con->in_msg_pos.page]); - ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, + ret = ceph_tcp_recvmsg(s, p + con->in_msg_pos.page_pos, left); kunmap(m->pages[con->in_msg_pos.page]); mutex_unlock(&m->page_mutex); @@ -774,7 +785,7 @@ static int read_message_partial(struct ceph_connection *con) /* footer */ while (con->in_base_pos < sizeof(m->hdr) + sizeof(m->footer)) { left = sizeof(m->hdr) + sizeof(m->footer) - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, &m->footer + + ret = ceph_tcp_recvmsg(s, &m->footer + (con->in_base_pos - sizeof(m->hdr)), left); if (ret <= 0) @@ -835,11 +846,12 @@ static void prepare_read_ack(struct ceph_connection *con) /* * read (part of) an ack */ -static int read_ack_partial(struct ceph_connection *con) +static int read_ack_partial(struct ceph_connection *con, + struct ceph_socket *s) { while (con->in_base_pos < sizeof(con->in_partial_ack)) { int left = sizeof(con->in_partial_ack) - con->in_base_pos; - int ret = ceph_tcp_recvmsg(con->sock, + int ret = ceph_tcp_recvmsg(s, (char *)&con->in_partial_ack + con->in_base_pos, left); if (ret <= 0) @@ -874,7 +886,8 @@ static void process_ack(struct ceph_connection *con) /* * read portion of connect-side handshake on a new connection */ -static int read_connect_partial(struct ceph_connection *con) +static int read_connect_partial(struct ceph_connection *con, + struct ceph_socket *s) { int ret, to; dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos); @@ -884,8 +897,7 @@ static int read_connect_partial(struct ceph_connection *con) while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->actual_peer_addr + have, + ret = ceph_tcp_recvmsg(s, (char *)&con->actual_peer_addr + have, left); if (ret <= 0) goto out; @@ -895,7 +907,7 @@ static int read_connect_partial(struct ceph_connection *con) /* in_tag */ to += 1; if (con->in_base_pos < to) { - ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); + ret = ceph_tcp_recvmsg(s, &con->in_tag, 1); if (ret <= 0) goto out; con->in_base_pos += ret; @@ -907,7 +919,7 @@ static int read_connect_partial(struct ceph_connection *con) if (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = sizeof(con->in_connect_seq) - left; - ret = ceph_tcp_recvmsg(con->sock, + ret = ceph_tcp_recvmsg(s, (char *)&con->in_connect_seq + have, left); if (ret <= 0) @@ -982,8 +994,8 @@ static void process_connect(struct ceph_connection *con) case CEPH_MSGR_TAG_WAIT: dout(10, "process_connect peer connecting WAIT\n"); set_bit(WAIT, &con->state); - ceph_sock_release(con->sock); - con->sock = NULL; + ceph_socket_put(con->s, 1); + con->s = NULL; break; case CEPH_MSGR_TAG_READY: dout(10, "process_connect got READY, now open\n"); @@ -1004,7 +1016,8 @@ static void process_connect(struct ceph_connection *con) /* * read portion of accept-side handshake on a newly accepted connection */ -static int read_accept_partial(struct ceph_connection *con) +static int read_accept_partial(struct ceph_connection *con, + struct ceph_socket *s) { int ret; int to; @@ -1014,7 +1027,7 @@ static int read_accept_partial(struct ceph_connection *con) while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, + ret = ceph_tcp_recvmsg(s, (char *)&con->peer_addr + have, left); if (ret <= 0) return ret; @@ -1026,7 +1039,7 @@ static int read_accept_partial(struct ceph_connection *con) while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = sizeof(con->peer_addr) - left; - ret = ceph_tcp_recvmsg(con->sock, + ret = ceph_tcp_recvmsg(s, (char *)&con->in_connect_seq + have, left); if (ret <= 0) @@ -1152,12 +1165,17 @@ static void process_accept(struct ceph_connection *con) */ static void try_read(struct work_struct *work) { - int ret = -1; - struct ceph_connection *con; + struct ceph_connection *con = container_of(work, struct ceph_connection, + rwork); struct ceph_messenger *msgr; + struct ceph_socket *s; + int ret = -1; - con = container_of(work, struct ceph_connection, rwork); dout(20, "try_read start on %p\n", con); + s = con->s; + if (!s) + goto out; + ceph_socket_get(s); msgr = con->msgr; retry: @@ -1174,7 +1192,7 @@ more: } if (test_bit(ACCEPTING, &con->state)) { dout(20, "try_read accepting\n"); - ret = read_accept_partial(con); + ret = read_accept_partial(con, s); if (ret <= 0) goto done; process_accept(con); /* accepted */ @@ -1182,7 +1200,7 @@ more: } if (test_bit(CONNECTING, &con->state)) { dout(20, "try_read connecting\n"); - ret = read_connect_partial(con); + ret = read_connect_partial(con, s); if (ret <= 0) goto done; process_connect(con); @@ -1194,7 +1212,7 @@ more: static char buf[1024]; int skip = min(1024, -con->in_base_pos); dout(20, "skipping %d / %d bytes\n", skip, -con->in_base_pos); - ret = ceph_tcp_recvmsg(con->sock, buf, skip); + ret = ceph_tcp_recvmsg(s, buf, skip); if (ret <= 0) goto done; con->in_base_pos += ret; @@ -1202,7 +1220,7 @@ more: goto more; } if (con->in_tag == CEPH_MSGR_TAG_READY) { - ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); + ret = ceph_tcp_recvmsg(s, &con->in_tag, 1); if (ret <= 0) goto done; dout(30, "try_read got tag %d\n", (int)con->in_tag); @@ -1221,7 +1239,7 @@ more: } } if (con->in_tag == CEPH_MSGR_TAG_MSG) { - ret = read_message_partial(con); + ret = read_message_partial(con, s); if (ret <= 0) goto done; if (con->in_tag == CEPH_MSGR_TAG_READY) @@ -1230,7 +1248,7 @@ more: goto more; } if (con->in_tag == CEPH_MSGR_TAG_ACK) { - ret = read_ack_partial(con); + ret = read_ack_partial(con, s); if (ret <= 0) goto done; process_ack(con); @@ -1251,6 +1269,7 @@ done: out: dout(20, "try_read done on %p\n", con); + ceph_socket_put(s, 0); put_connection(con); return; } @@ -1280,7 +1299,7 @@ static void try_accept(struct work_struct *work) clear_bit(NEW, &new_con->state); new_con->in_tag = CEPH_MSGR_TAG_READY; /* eventually, hopefully */ - if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) { + if (ceph_tcp_accept(msgr->listen_s, new_con) < 0) { derr(1, "error accepting connection\n"); put_connection(new_con); goto done; @@ -1344,6 +1363,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) dout(1, "messenger %p listening on %u.%u.%u.%u:%u\n", msgr, IPQUADPORT(msgr->inst.addr.ipaddr)); + return msgr; } @@ -1354,9 +1374,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) dout(2, "destroy %p\n", msgr); /* stop listener */ - ceph_cancel_sock_callbacks(msgr->listen_sock); + ceph_socket_put(msgr->listen_s, 1); cancel_work_sync(&msgr->awork); - ceph_sock_release(msgr->listen_sock); /* kill off connections */ spin_lock(&msgr->con_lock); @@ -1370,7 +1389,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) /* in case there's queued work... */ spin_unlock(&msgr->con_lock); - ceph_cancel_sock_callbacks(con->sock); + ceph_cancel_sock_callbacks(con->s); cancel_work_sync(&con->rwork); cancel_delayed_work_sync(&con->swork); put_connection(con); diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 356b8faf92283..5c3d955e10825 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -1,6 +1,7 @@ #ifndef __FS_CEPH_MESSENGER_H #define __FS_CEPH_MESSENGER_H +#include #include #include #include @@ -39,7 +40,7 @@ struct ceph_messenger { ceph_msgr_peer_reset_t peer_reset; ceph_msgr_prepare_pages_t prepare_pages; struct ceph_entity_inst inst; /* my name+address */ - struct socket *listen_sock; /* listening socket */ + struct ceph_socket *listen_s; /* listening socket */ struct work_struct awork; /* accept work */ spinlock_t con_lock; struct list_head con_all; /* all connections */ @@ -68,6 +69,7 @@ struct ceph_msg_pos { #define BASE_DELAY_INTERVAL (HZ/2) #define MAX_DELAY_INTERVAL (5U * 60 * HZ) + /* ceph_connection state bit flags */ #define NEW 0 #define CONNECTING 1 @@ -84,7 +86,7 @@ struct ceph_msg_pos { struct ceph_connection { struct ceph_messenger *msgr; - struct socket *sock; /* connection socket */ + struct ceph_socket *s; /* connection socket */ unsigned long state; /* connection state */ const char *error_msg; -- 2.39.5