]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: wrap socket in refcounting kobject
authorSage Weil <sage@newdream.net>
Mon, 26 May 2008 16:15:09 +0000 (09:15 -0700)
committerSage Weil <sage@newdream.net>
Mon, 26 May 2008 16:15:09 +0000 (09:15 -0700)
src/TODO
src/kernel/ktcp.c
src/kernel/ktcp.h
src/kernel/messenger.c
src/kernel/messenger.h

index f56542fdbe24327c92d10df80c71b8ee8c127448..bd65fd540eb12795b5ec38b85c9dfd62cd36708c 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -38,6 +38,8 @@ kernel client
     - should we be using debugfs?
   - a dir for each client instance (client###)?
   - hooks to get mds, osd, monmap epoch #s
+- clean up messenger vs ktcp
+ - hook into sysfs?
 - vfs
  - can we use dentry_path(), if it gets merged into mainline?
 - io / osd client
index cf80c8e56893b4ac60e069cfe435bc027156f760..e29ee0dbdfb6dfba7224f9c29cd1659f6e667aca 100644 (file)
@@ -14,6 +14,67 @@ struct workqueue_struct *recv_wq;    /* receive work queue */
 struct workqueue_struct *send_wq;      /* send work queue */
 struct workqueue_struct *accept_wq;    /* accept work queue */
 
+struct kobject *ceph_sockets_kobj;
+
+/*
+ * sockets
+ */
+void ceph_socket_destroy(struct kobject *kobj)
+{
+       struct ceph_socket *s = container_of(kobj, struct ceph_socket, kobj);
+       dout(10, "socket_destroy %p\n", s);
+       sock_release(s->sock);
+       kfree(s);
+}
+
+struct kobj_type ceph_socket_type = {
+       .release = ceph_socket_destroy,
+};
+
+struct ceph_socket *ceph_socket_create()
+{
+       struct ceph_socket *s;
+       int err;
+       
+       s = kzalloc(sizeof(*s), GFP_NOFS);
+       if (!s) {
+               derr(10, "ENOMEM creating ceph_socket\n");
+               return ERR_PTR(-ENOMEM);
+       }
+
+       err = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &s->sock);
+       if (err) {
+               derr(10, "sock_create_kern error %d\n", err);
+               return ERR_PTR(err);
+       }
+
+       kobject_init_and_add(&s->kobj, &ceph_socket_type,
+                            ceph_sockets_kobj,
+                            "socket %p", s);
+       return s;
+}
+
+void ceph_socket_get(struct ceph_socket *s)
+{
+       struct kobject *r;
+
+       dout(10, "socket_get %p\n", s);
+       r = kobject_get(&s->kobj);
+       BUG_ON(!r);
+}
+
+void ceph_socket_put(struct ceph_socket *s, int die)
+{
+       dout(10, "socket_put %p\n", s);
+       if (!s)
+               return;
+       if (die)
+               ceph_cancel_sock_callbacks(s);
+       kobject_put(&s->kobj);
+}
+
+
+
 /*
  * socket callback functions
  */
@@ -89,77 +150,74 @@ static void ceph_state_change(struct sock *sk)
 }
 
 /* make a listening socket active by setting up the data ready call back */
-static void listen_sock_callbacks(struct socket *sock,
+static void listen_sock_callbacks(struct ceph_socket *s,
                                  struct ceph_messenger *msgr)
 {
-       struct sock *sk = sock->sk;
+       struct sock *sk = s->sock->sk;
        sk->sk_user_data = (void *)msgr;
        sk->sk_data_ready = ceph_accept_ready;
 }
 
 /* make a socket active by setting up the call back functions */
-static void set_sock_callbacks(struct socket *sock, struct ceph_connection *con)
+static void set_sock_callbacks(struct ceph_socket *s,
+                              struct ceph_connection *con)
 {
-       struct sock *sk = sock->sk;
+       struct sock *sk = s->sock->sk;
        sk->sk_user_data = (void *)con;
        sk->sk_data_ready = ceph_data_ready;
        sk->sk_write_space = ceph_write_space;
        sk->sk_state_change = ceph_state_change;
 }
 
-void ceph_cancel_sock_callbacks(struct socket *sock)
+void ceph_cancel_sock_callbacks(struct ceph_socket *s)
 {
        struct sock *sk;
-       if (!sock)
+       if (!s)
                return;
-       sk = sock->sk;
+       sk = s->sock->sk;
        sk->sk_user_data = 0;
        sk->sk_data_ready = 0;
        sk->sk_write_space = 0;
        sk->sk_state_change = 0;
 }
-void ceph_sock_release(struct socket *sock)
-{
-       if (!sock)
-               return;
-       ceph_cancel_sock_callbacks(sock);
-       sock_release(sock);
-}
 
 /*
  * initiate connection to a remote socket.
  */
-int ceph_tcp_connect(struct ceph_connection *con)
+struct ceph_socket *ceph_tcp_connect(struct ceph_connection *con)
 {
        int ret;
        struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+       struct ceph_socket *s;
 
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
-       if (ret < 0) {
-               derr(1, "connect sock_create_kern error: %d\n", ret);
-               goto done;
-       }
-       
-       con->sock->sk->sk_allocation = GFP_NOFS;
+       s = ceph_socket_create();
+       if (IS_ERR(s))
+               return s;
+
+       con->s = s;
+       s->sock->sk->sk_allocation = GFP_NOFS;
 
-       set_sock_callbacks(con->sock, con);
+       set_sock_callbacks(s, con);
 
-       ret = con->sock->ops->connect(con->sock, paddr,
-                                     sizeof(struct sockaddr_in), O_NONBLOCK);
+       ret = s->sock->ops->connect(s->sock, paddr,
+                                   sizeof(struct sockaddr_in), O_NONBLOCK);
        if (ret == -EINPROGRESS) {
                dout(20, "connect EINPROGRESS sk_state = = %u\n",
-                    con->sock->sk->sk_state);
-               return 0;
+                    s->sock->sk->sk_state);
+               ret = 0;
        }
        if (ret < 0) {
                /* TBD check for fatal errors, retry if not fatal.. */
                derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
                     IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
-               sock_release(con->sock);
-               con->sock = NULL;
+               ceph_socket_put(s, 1);
+               con->s = 0;
        }
-done:
-       return ret;
+
+       if (ret < 0)
+               return ERR_PTR(ret);
+       ceph_socket_get(s); /* for caller */
+       return s;
 }
 
 /*
@@ -168,27 +226,26 @@ done:
 int ceph_tcp_listen(struct ceph_messenger *msgr)
 {
        int ret;
-       struct socket *sock = NULL;
        int optval = 1;
        struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
        int nlen;
+       struct ceph_socket *s;
 
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
-       if (ret < 0) {
-               derr(0, "sock_create_kern error: %d\n", ret);
-               return ret;
-       }
+       s = ceph_socket_create();
+       if (IS_ERR(s))
+               return PTR_ERR(s);
 
-       sock->sk->sk_allocation = GFP_NOFS;
+       s->sock->sk->sk_allocation = GFP_NOFS;
 
-       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+       ret = kernel_setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR,
                                (char *)&optval, sizeof(optval));
        if (ret < 0) {
                derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
                goto err;
        }
 
-       ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, sizeof(*myaddr));
+       ret = s->sock->ops->bind(s->sock, (struct sockaddr *)myaddr,
+                                sizeof(*myaddr));
        if (ret < 0) {
                derr(0, "Failed to bind: %d\n", ret);
                goto err;
@@ -196,93 +253,90 @@ int ceph_tcp_listen(struct ceph_messenger *msgr)
 
        /* what port did we bind to? */
        nlen = sizeof(*myaddr);
-       ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen, 0);
+       ret = s->sock->ops->getname(s->sock, (struct sockaddr *)myaddr, &nlen,
+                                   0);
        if (ret < 0) {
                derr(0, "failed to getsockname: %d\n", ret);
                goto err;
        }
        dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
 
-       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+       ret = kernel_setsockopt(s->sock, SOL_SOCKET, SO_KEEPALIVE,
                                (char *)&optval, sizeof(optval));
        if (ret < 0) {
                derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
                goto err;
        }
 
-       msgr->listen_sock = sock;
-
        /* TBD: probaby want to tune the backlog queue .. */
-       ret = sock->ops->listen(sock, NUM_BACKUP);
+       ret = s->sock->ops->listen(s->sock, NUM_BACKUP);
        if (ret < 0) {
                derr(0, "kernel_listen error: %d\n", ret);
-               msgr->listen_sock = NULL;
                goto err;
        }
 
-       /* setup callbacks */
-       listen_sock_callbacks(msgr->listen_sock, msgr);
-
+       /* ok! */
+       msgr->listen_s = s;
+       listen_sock_callbacks(s, msgr);
        return ret;
+
 err:
-       sock_release(sock);
+       ceph_socket_put(s, 0);
        return ret;
 }
 
 /*
  *  accept a connection
  */
-int ceph_tcp_accept(struct socket *sock, struct ceph_connection *con)
+int ceph_tcp_accept(struct ceph_socket *ls, struct ceph_connection *con)
 {
        int ret;
        struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
        int len;
+       struct ceph_socket *s;
+       
+       s = ceph_socket_create();
+       if (IS_ERR(s))
+               return PTR_ERR(s);
+       con->s = s;
 
+       s->sock->sk->sk_allocation = GFP_NOFS;
 
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
-       if (ret < 0) {
-               derr(0, "sock_create_kern error: %d\n", ret);
-               goto done;
-       }
-
-       con->sock->sk->sk_allocation = GFP_NOFS;
-
-       ret = sock->ops->accept(sock, con->sock, O_NONBLOCK);
-       /* ret = kernel_accept(sock, &new_sock, sock->file->f_flags); */
+       ret = ls->sock->ops->accept(ls->sock, s->sock, O_NONBLOCK);
        if (ret < 0) {
                derr(0, "accept error: %d\n", ret);
                goto err;
        }
 
        /* setup callbacks */
-       set_sock_callbacks(con->sock, con);
+       set_sock_callbacks(s, con);
 
-       con->sock->ops = sock->ops;
-       con->sock->type = sock->type;
-       ret = con->sock->ops->getname(con->sock, paddr, &len, 2);
+       s->sock->ops = ls->sock->ops;
+       s->sock->type = ls->sock->type;
+       ret = s->sock->ops->getname(s->sock, paddr, &len, 2);
        if (ret < 0) {
                derr(0, "getname error: %d\n", ret);
                goto err;
        }
-
-done:
        return ret;
+
 err:
-       sock_release(con->sock);
+       ceph_socket_put(s, 0);
+       con->s = 0;
        return ret;
 }
 
 /*
  * receive a message this may return after partial send
  */
-int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
+int ceph_tcp_recvmsg(struct ceph_socket *s, void *buf, size_t len)
 {
        struct kvec iov = {buf, len};
        struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;           /* length read */
 
        msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
-       rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
+       rlen = kernel_recvmsg(s->sock, &msg, &iov, 1, len, msg.msg_flags);
        return(rlen);
 }
 
@@ -290,7 +344,7 @@ int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
 /*
  * Send a message this may return after partial send
  */
-int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
+int ceph_tcp_sendmsg(struct ceph_socket *s, struct kvec *iov,
                     size_t kvlen, size_t len, int more)
 {
        struct msghdr msg = {.msg_flags = 0};
@@ -303,7 +357,7 @@ int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
                msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
 
        /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
-       rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
+       rlen = kernel_sendmsg(s->sock, &msg, iov, kvlen, len);
        /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
        return(rlen);
 }
index ff5115311c41a6092f1c6ee01a1004fe782da2bb..840c8d5a8865b8e7dbc48f581e4e224371ae02b3 100644 (file)
@@ -4,19 +4,28 @@
 extern struct workqueue_struct *recv_wq;       /* receive work queue */
 extern struct workqueue_struct *send_wq;       /* send work queue */
 
+/* wrap socket, since we use/drop it from multiple threads */
+extern struct kobject *ceoh_sockets_kobj;
+
+struct ceph_socket {
+       struct kobject kobj;
+       struct socket *sock;
+};
+
 /* prototype definitions */
-int ceph_tcp_connect(struct ceph_connection *);
+struct ceph_socket *ceph_tcp_connect(struct ceph_connection *);
 int ceph_tcp_listen(struct ceph_messenger *);
-int ceph_tcp_accept(struct socket *, struct ceph_connection *);
-int ceph_tcp_recvmsg(struct socket *, void *, size_t );
-int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more);
-void ceph_cancel_sock_callbacks(struct socket *);
-void ceph_sock_release(struct socket *);
+int ceph_tcp_accept(struct ceph_socket *, struct ceph_connection *);
+int ceph_tcp_recvmsg(struct ceph_socket *, void *, size_t );
+int ceph_tcp_sendmsg(struct ceph_socket *, struct kvec *, size_t, size_t, int more);
+void ceph_cancel_sock_callbacks(struct ceph_socket *);
 int ceph_workqueue_init(void);
 void ceph_workqueue_shutdown(void);
 
-/* Well known port for ceph client listener.. */
-#define CEPH_PORT 2002
+extern struct ceph_socket *ceph_socket_create(void);
+extern void ceph_socket_get(struct ceph_socket *s);
+extern void ceph_socket_put(struct ceph_socket *s, int die);
+
 /* Max number of outstanding connections in listener queueu */
 #define NUM_BACKUP 10
 #endif
index 7c91e9c972d0a47b1dac456a2789ad9e70c007e6..bd32cd321b2f6366e9f14442fe29627201ceb7e5 100644 (file)
@@ -28,6 +28,8 @@ static void try_accept(struct work_struct *);
 
 
 
+
+
 /*
  * connections
  */
@@ -118,7 +120,7 @@ static void put_connection(struct ceph_connection *con)
                dout(20, "put_connection destroying %p\n", con);
                ceph_msg_put_list(&con->out_queue);
                ceph_msg_put_list(&con->out_sent);
-               ceph_sock_release(con->sock);
+               ceph_socket_put(con->s, 1);
                kfree(con);
        }
 }
@@ -275,8 +277,8 @@ static void ceph_fault(struct ceph_connection *con)
 
        if (con->delay) {
                dout(30, "fault tcp_close delay != 0\n");
-               ceph_sock_release(con->sock);
-               con->sock = NULL;
+               ceph_socket_put(con->s, 1);
+               con->s = NULL;
                set_bit(NEW, &con->state);
 
                /*
@@ -328,13 +330,14 @@ static void ceph_fault(struct ceph_connection *con)
  *  0 -> socket full, but more to do
  * <0 -> error
  */
-static int write_partial_kvec(struct ceph_connection *con)
+static int write_partial_kvec(struct ceph_connection *con,
+                             struct ceph_socket *s)
 {
        int ret;
 
        dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
        while (con->out_kvec_bytes > 0) {
-               ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
+               ret = ceph_tcp_sendmsg(s, con->out_kvec_cur,
                                       con->out_kvec_left, con->out_kvec_bytes,
                                       con->out_more);
                if (ret <= 0)
@@ -364,6 +367,7 @@ out:
 }
 
 static int write_partial_msg_pages(struct ceph_connection *con,
+                                  struct ceph_socket *s,
                                   struct ceph_msg *msg)
 {
        struct kvec kv;
@@ -389,7 +393,7 @@ static int write_partial_msg_pages(struct ceph_connection *con,
                kv.iov_base = kaddr + con->out_msg_pos.page_pos;
                kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
                                 (int)(data_len - con->out_msg_pos.data_pos));
-               ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len, 1);
+               ret = ceph_tcp_sendmsg(s, &kv, 1, kv.iov_len, 1);
                if (msg->pages)
                        kunmap(page);
                mutex_unlock(&msg->page_mutex);
@@ -574,9 +578,12 @@ static void try_write(struct work_struct *work)
                container_of(work, struct ceph_connection, swork.work);
        struct ceph_messenger *msgr = con->msgr;
        int ret = 1;
+       struct ceph_socket *s = con->s;
 
        dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
             atomic_read(&con->nref));
+       if (s)
+               ceph_socket_get(s);
 
        if (test_bit(CLOSED, &con->state)) {
                dout(5, "try_write closed\n");
@@ -600,8 +607,10 @@ more:
                con->in_tag = CEPH_MSGR_TAG_READY;
                dout(5, "try_write initiating connect on %p new state %lu\n",
                     con, con->state);
-               ret = ceph_tcp_connect(con);
-               if (ret < 0) {
+               BUG_ON(s);
+               s = ceph_tcp_connect(con);
+               dout(10, "tcp_connect returned %p\n", s);
+               if (IS_ERR(s)) {
                        con->error_msg = "connect error";
                        ceph_fault(con);
                        goto done;
@@ -611,7 +620,7 @@ more:
        /* kvec data queued? */
 more_kvec:
        if (con->out_kvec_left) {
-               ret = write_partial_kvec(con);
+               ret = write_partial_kvec(con, s);
                if (ret == 0)
                        goto done;
                if (ret < 0) {
@@ -622,7 +631,7 @@ more_kvec:
 
        /* msg pages? */
        if (con->out_msg && con->out_msg->nr_pages) {
-               ret = write_partial_msg_pages(con, con->out_msg);
+               ret = write_partial_msg_pages(con, s, con->out_msg);
                if (ret == 1)
                        goto more_kvec;
                if (ret == 0)
@@ -655,6 +664,7 @@ more_kvec:
 
 done:
        dout(30, "try_write done on %p\n", con);
+       ceph_socket_put(s, 0);
        put_connection(con);
        return;
 }
@@ -681,7 +691,8 @@ static int prepare_read_message(struct ceph_connection *con)
 /*
  * read (part of) a message
  */
-static int read_message_partial(struct ceph_connection *con)
+static int read_message_partial(struct ceph_connection *con,
+                               struct ceph_socket *s)
 {
        struct ceph_msg *m = con->in_msg;
        void *p;
@@ -694,7 +705,7 @@ static int read_message_partial(struct ceph_connection *con)
        /* header */
        while (con->in_base_pos < sizeof(m->hdr)) {
                left = sizeof(m->hdr) - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos,
+               ret = ceph_tcp_recvmsg(s, &m->hdr + con->in_base_pos,
                                       left);
                if (ret <= 0)
                        return ret;
@@ -710,7 +721,7 @@ static int read_message_partial(struct ceph_connection *con)
                                return -ENOMEM;
                }
                left = front_len - m->front.iov_len;
-               ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
+               ret = ceph_tcp_recvmsg(s, (char *)m->front.iov_base +
                                       m->front.iov_len, left);
                if (ret <= 0)
                        return ret;
@@ -757,7 +768,7 @@ static int read_message_partial(struct ceph_connection *con)
                        return 0;
                }
                p = kmap(m->pages[con->in_msg_pos.page]);
-               ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+               ret = ceph_tcp_recvmsg(s, p + con->in_msg_pos.page_pos,
                                       left);
                kunmap(m->pages[con->in_msg_pos.page]);
                mutex_unlock(&m->page_mutex);
@@ -774,7 +785,7 @@ static int read_message_partial(struct ceph_connection *con)
        /* footer */
        while (con->in_base_pos < sizeof(m->hdr) + sizeof(m->footer)) {
                left = sizeof(m->hdr) + sizeof(m->footer) - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, &m->footer +
+               ret = ceph_tcp_recvmsg(s, &m->footer +
                                       (con->in_base_pos - sizeof(m->hdr)),
                                       left);
                if (ret <= 0)
@@ -835,11 +846,12 @@ static void prepare_read_ack(struct ceph_connection *con)
 /*
  * read (part of) an ack
  */
-static int read_ack_partial(struct ceph_connection *con)
+static int read_ack_partial(struct ceph_connection *con,
+                           struct ceph_socket *s)
 {
        while (con->in_base_pos < sizeof(con->in_partial_ack)) {
                int left = sizeof(con->in_partial_ack) - con->in_base_pos;
-               int ret = ceph_tcp_recvmsg(con->sock,
+               int ret = ceph_tcp_recvmsg(s,
                                           (char *)&con->in_partial_ack +
                                           con->in_base_pos, left);
                if (ret <= 0)
@@ -874,7 +886,8 @@ static void process_ack(struct ceph_connection *con)
 /*
  * read portion of connect-side handshake on a new connection
  */
-static int read_connect_partial(struct ceph_connection *con)
+static int read_connect_partial(struct ceph_connection *con,
+                               struct ceph_socket *s)
 {
        int ret, to;
        dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
@@ -884,8 +897,7 @@ static int read_connect_partial(struct ceph_connection *con)
        while (con->in_base_pos < to) {
                int left = to - con->in_base_pos;
                int have = con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock,
-                                      (char *)&con->actual_peer_addr + have,
+               ret = ceph_tcp_recvmsg(s, (char *)&con->actual_peer_addr + have,
                                       left);
                if (ret <= 0)
                        goto out;
@@ -895,7 +907,7 @@ static int read_connect_partial(struct ceph_connection *con)
        /* in_tag */
        to += 1;
        if (con->in_base_pos < to) {
-               ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
+               ret = ceph_tcp_recvmsg(s, &con->in_tag, 1);
                if (ret <= 0)
                        goto out;
                con->in_base_pos += ret;
@@ -907,7 +919,7 @@ static int read_connect_partial(struct ceph_connection *con)
                if (con->in_base_pos < to) {
                        int left = to - con->in_base_pos;
                        int have = sizeof(con->in_connect_seq) - left;
-                       ret = ceph_tcp_recvmsg(con->sock,
+                       ret = ceph_tcp_recvmsg(s,
                                               (char *)&con->in_connect_seq +
                                               have, left);
                        if (ret <= 0)
@@ -982,8 +994,8 @@ static void process_connect(struct ceph_connection *con)
        case CEPH_MSGR_TAG_WAIT:
                dout(10, "process_connect peer connecting WAIT\n");
                set_bit(WAIT, &con->state);
-               ceph_sock_release(con->sock);
-               con->sock = NULL;
+               ceph_socket_put(con->s, 1);
+               con->s = NULL;
                break;
        case CEPH_MSGR_TAG_READY:
                dout(10, "process_connect got READY, now open\n");
@@ -1004,7 +1016,8 @@ static void process_connect(struct ceph_connection *con)
 /*
  * read portion of accept-side handshake on a newly accepted connection
  */
-static int read_accept_partial(struct ceph_connection *con)
+static int read_accept_partial(struct ceph_connection *con,
+                              struct ceph_socket *s)
 {
        int ret;
        int to;
@@ -1014,7 +1027,7 @@ static int read_accept_partial(struct ceph_connection *con)
        while (con->in_base_pos < to) {
                int left = to - con->in_base_pos;
                int have = con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock,
+               ret = ceph_tcp_recvmsg(s,
                                       (char *)&con->peer_addr + have, left);
                if (ret <= 0)
                        return ret;
@@ -1026,7 +1039,7 @@ static int read_accept_partial(struct ceph_connection *con)
        while (con->in_base_pos < to) {
                int left = to - con->in_base_pos;
                int have = sizeof(con->peer_addr) - left;
-               ret = ceph_tcp_recvmsg(con->sock,
+               ret = ceph_tcp_recvmsg(s,
                                       (char *)&con->in_connect_seq + have,
                                       left);
                if (ret <= 0)
@@ -1152,12 +1165,17 @@ static void process_accept(struct ceph_connection *con)
  */
 static void try_read(struct work_struct *work)
 {
-       int ret = -1;
-       struct ceph_connection *con;
+       struct ceph_connection *con = container_of(work, struct ceph_connection,
+                                                  rwork);
        struct ceph_messenger *msgr;
+       struct ceph_socket *s;
+       int ret = -1;
 
-       con = container_of(work, struct ceph_connection, rwork);
        dout(20, "try_read start on %p\n", con);
+       s = con->s;
+       if (!s)
+               goto out;
+       ceph_socket_get(s);
        msgr = con->msgr;
 
 retry:
@@ -1174,7 +1192,7 @@ more:
        }
        if (test_bit(ACCEPTING, &con->state)) {
                dout(20, "try_read accepting\n");
-               ret = read_accept_partial(con);
+               ret = read_accept_partial(con, s);
                if (ret <= 0)
                        goto done;
                process_accept(con);            /* accepted */
@@ -1182,7 +1200,7 @@ more:
        }
        if (test_bit(CONNECTING, &con->state)) {
                dout(20, "try_read connecting\n");
-               ret = read_connect_partial(con);
+               ret = read_connect_partial(con, s);
                if (ret <= 0)
                        goto done;
                process_connect(con);
@@ -1194,7 +1212,7 @@ more:
                static char buf[1024];
                int skip = min(1024, -con->in_base_pos);
                dout(20, "skipping %d / %d bytes\n", skip, -con->in_base_pos);
-               ret = ceph_tcp_recvmsg(con->sock, buf, skip);
+               ret = ceph_tcp_recvmsg(s, buf, skip);
                if (ret <= 0)
                        goto done;
                con->in_base_pos += ret;
@@ -1202,7 +1220,7 @@ more:
                        goto more;
        }
        if (con->in_tag == CEPH_MSGR_TAG_READY) {
-               ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
+               ret = ceph_tcp_recvmsg(s, &con->in_tag, 1);
                if (ret <= 0)
                        goto done;
                dout(30, "try_read got tag %d\n", (int)con->in_tag);
@@ -1221,7 +1239,7 @@ more:
                }
        }
        if (con->in_tag == CEPH_MSGR_TAG_MSG) {
-               ret = read_message_partial(con);
+               ret = read_message_partial(con, s);
                if (ret <= 0)
                        goto done;
                if (con->in_tag == CEPH_MSGR_TAG_READY)
@@ -1230,7 +1248,7 @@ more:
                goto more;
        }
        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
-               ret = read_ack_partial(con);
+               ret = read_ack_partial(con, s);
                if (ret <= 0)
                        goto done;
                process_ack(con);
@@ -1251,6 +1269,7 @@ done:
 
 out:
        dout(20, "try_read done on %p\n", con);
+       ceph_socket_put(s, 0);
        put_connection(con);
        return;
 }
@@ -1280,7 +1299,7 @@ static void try_accept(struct work_struct *work)
        clear_bit(NEW, &new_con->state);
        new_con->in_tag = CEPH_MSGR_TAG_READY;  /* eventually, hopefully */
 
-       if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
+       if (ceph_tcp_accept(msgr->listen_s, new_con) < 0) {
                derr(1, "error accepting connection\n");
                put_connection(new_con);
                goto done;
@@ -1344,6 +1363,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
 
        dout(1, "messenger %p listening on %u.%u.%u.%u:%u\n", msgr,
             IPQUADPORT(msgr->inst.addr.ipaddr));
+
        return msgr;
 }
 
@@ -1354,9 +1374,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
        dout(2, "destroy %p\n", msgr);
        
        /* stop listener */
-       ceph_cancel_sock_callbacks(msgr->listen_sock);
+       ceph_socket_put(msgr->listen_s, 1);
        cancel_work_sync(&msgr->awork);
-       ceph_sock_release(msgr->listen_sock);
 
        /* kill off connections */
        spin_lock(&msgr->con_lock);
@@ -1370,7 +1389,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
                
                /* in case there's queued work... */
                spin_unlock(&msgr->con_lock);
-               ceph_cancel_sock_callbacks(con->sock);
+               ceph_cancel_sock_callbacks(con->s);
                cancel_work_sync(&con->rwork);
                cancel_delayed_work_sync(&con->swork);
                put_connection(con);
index 356b8faf9228308b238aa3342b1e526e66055b00..5c3d955e10825ffd11902248394d3537b37cd95b 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef __FS_CEPH_MESSENGER_H
 #define __FS_CEPH_MESSENGER_H
 
+#include <linux/kobject.h>
 #include <linux/mutex.h>
 #include <linux/net.h>
 #include <linux/radix-tree.h>
@@ -39,7 +40,7 @@ struct ceph_messenger {
        ceph_msgr_peer_reset_t peer_reset;
        ceph_msgr_prepare_pages_t prepare_pages;
        struct ceph_entity_inst inst;    /* my name+address */
-       struct socket *listen_sock;      /* listening socket */
+       struct ceph_socket *listen_s;    /* listening socket */
        struct work_struct awork;        /* accept work */
        spinlock_t con_lock;
        struct list_head con_all;        /* all connections */
@@ -68,6 +69,7 @@ struct ceph_msg_pos {
 #define BASE_DELAY_INTERVAL    (HZ/2)
 #define MAX_DELAY_INTERVAL     (5U * 60 * HZ)
 
+
 /* ceph_connection state bit flags */
 #define NEW            0
 #define CONNECTING     1
@@ -84,7 +86,7 @@ struct ceph_msg_pos {
 
 struct ceph_connection {
        struct ceph_messenger *msgr;
-       struct socket *sock;    /* connection socket */
+       struct ceph_socket *s;  /* connection socket */
        unsigned long state;    /* connection state */
        const char *error_msg;