- should we be using debugfs?
- a dir for each client instance (client###)?
- hooks to get mds, osd, monmap epoch #s
+- clean up messenger vs ktcp
+ - hook into sysfs?
- vfs
- can we use dentry_path(), if it gets merged into mainline?
- io / osd client
struct workqueue_struct *send_wq; /* send work queue */
struct workqueue_struct *accept_wq; /* accept work queue */
+struct kobject *ceph_sockets_kobj;
+
+/*
+ * sockets
+ */
+void ceph_socket_destroy(struct kobject *kobj)
+{
+ struct ceph_socket *s = container_of(kobj, struct ceph_socket, kobj);
+ dout(10, "socket_destroy %p\n", s);
+ sock_release(s->sock);
+ kfree(s);
+}
+
+struct kobj_type ceph_socket_type = {
+ .release = ceph_socket_destroy,
+};
+
+struct ceph_socket *ceph_socket_create()
+{
+ struct ceph_socket *s;
+ int err;
+
+ s = kzalloc(sizeof(*s), GFP_NOFS);
+ if (!s) {
+ derr(10, "ENOMEM creating ceph_socket\n");
+ return ERR_PTR(-ENOMEM);
+ }
+
+ err = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &s->sock);
+ if (err) {
+ derr(10, "sock_create_kern error %d\n", err);
+ return ERR_PTR(err);
+ }
+
+ kobject_init_and_add(&s->kobj, &ceph_socket_type,
+ ceph_sockets_kobj,
+ "socket %p", s);
+ return s;
+}
+
+void ceph_socket_get(struct ceph_socket *s)
+{
+ struct kobject *r;
+
+ dout(10, "socket_get %p\n", s);
+ r = kobject_get(&s->kobj);
+ BUG_ON(!r);
+}
+
+void ceph_socket_put(struct ceph_socket *s, int die)
+{
+ dout(10, "socket_put %p\n", s);
+ if (!s)
+ return;
+ if (die)
+ ceph_cancel_sock_callbacks(s);
+ kobject_put(&s->kobj);
+}
+
+
+
/*
* socket callback functions
*/
}
/* make a listening socket active by setting up the data ready call back */
-static void listen_sock_callbacks(struct socket *sock,
+static void listen_sock_callbacks(struct ceph_socket *s,
struct ceph_messenger *msgr)
{
- struct sock *sk = sock->sk;
+ struct sock *sk = s->sock->sk;
sk->sk_user_data = (void *)msgr;
sk->sk_data_ready = ceph_accept_ready;
}
/* make a socket active by setting up the call back functions */
-static void set_sock_callbacks(struct socket *sock, struct ceph_connection *con)
+static void set_sock_callbacks(struct ceph_socket *s,
+ struct ceph_connection *con)
{
- struct sock *sk = sock->sk;
+ struct sock *sk = s->sock->sk;
sk->sk_user_data = (void *)con;
sk->sk_data_ready = ceph_data_ready;
sk->sk_write_space = ceph_write_space;
sk->sk_state_change = ceph_state_change;
}
-void ceph_cancel_sock_callbacks(struct socket *sock)
+void ceph_cancel_sock_callbacks(struct ceph_socket *s)
{
struct sock *sk;
- if (!sock)
+ if (!s)
return;
- sk = sock->sk;
+ sk = s->sock->sk;
sk->sk_user_data = 0;
sk->sk_data_ready = 0;
sk->sk_write_space = 0;
sk->sk_state_change = 0;
}
-void ceph_sock_release(struct socket *sock)
-{
- if (!sock)
- return;
- ceph_cancel_sock_callbacks(sock);
- sock_release(sock);
-}
/*
* initiate connection to a remote socket.
*/
-int ceph_tcp_connect(struct ceph_connection *con)
+struct ceph_socket *ceph_tcp_connect(struct ceph_connection *con)
{
int ret;
struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+ struct ceph_socket *s;
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
- if (ret < 0) {
- derr(1, "connect sock_create_kern error: %d\n", ret);
- goto done;
- }
-
- con->sock->sk->sk_allocation = GFP_NOFS;
+ s = ceph_socket_create();
+ if (IS_ERR(s))
+ return s;
+
+ con->s = s;
+ s->sock->sk->sk_allocation = GFP_NOFS;
- set_sock_callbacks(con->sock, con);
+ set_sock_callbacks(s, con);
- ret = con->sock->ops->connect(con->sock, paddr,
- sizeof(struct sockaddr_in), O_NONBLOCK);
+ ret = s->sock->ops->connect(s->sock, paddr,
+ sizeof(struct sockaddr_in), O_NONBLOCK);
if (ret == -EINPROGRESS) {
dout(20, "connect EINPROGRESS sk_state = = %u\n",
- con->sock->sk->sk_state);
- return 0;
+ s->sock->sk->sk_state);
+ ret = 0;
}
if (ret < 0) {
/* TBD check for fatal errors, retry if not fatal.. */
derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
- sock_release(con->sock);
- con->sock = NULL;
+ ceph_socket_put(s, 1);
+ con->s = 0;
}
-done:
- return ret;
+
+ if (ret < 0)
+ return ERR_PTR(ret);
+ ceph_socket_get(s); /* for caller */
+ return s;
}
/*
int ceph_tcp_listen(struct ceph_messenger *msgr)
{
int ret;
- struct socket *sock = NULL;
int optval = 1;
struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
int nlen;
+ struct ceph_socket *s;
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
- if (ret < 0) {
- derr(0, "sock_create_kern error: %d\n", ret);
- return ret;
- }
+ s = ceph_socket_create();
+ if (IS_ERR(s))
+ return PTR_ERR(s);
- sock->sk->sk_allocation = GFP_NOFS;
+ s->sock->sk->sk_allocation = GFP_NOFS;
- ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ ret = kernel_setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR,
(char *)&optval, sizeof(optval));
if (ret < 0) {
derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
goto err;
}
- ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, sizeof(*myaddr));
+ ret = s->sock->ops->bind(s->sock, (struct sockaddr *)myaddr,
+ sizeof(*myaddr));
if (ret < 0) {
derr(0, "Failed to bind: %d\n", ret);
goto err;
/* what port did we bind to? */
nlen = sizeof(*myaddr);
- ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen, 0);
+ ret = s->sock->ops->getname(s->sock, (struct sockaddr *)myaddr, &nlen,
+ 0);
if (ret < 0) {
derr(0, "failed to getsockname: %d\n", ret);
goto err;
}
dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
- ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+ ret = kernel_setsockopt(s->sock, SOL_SOCKET, SO_KEEPALIVE,
(char *)&optval, sizeof(optval));
if (ret < 0) {
derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
goto err;
}
- msgr->listen_sock = sock;
-
/* TBD: probaby want to tune the backlog queue .. */
- ret = sock->ops->listen(sock, NUM_BACKUP);
+ ret = s->sock->ops->listen(s->sock, NUM_BACKUP);
if (ret < 0) {
derr(0, "kernel_listen error: %d\n", ret);
- msgr->listen_sock = NULL;
goto err;
}
- /* setup callbacks */
- listen_sock_callbacks(msgr->listen_sock, msgr);
-
+ /* ok! */
+ msgr->listen_s = s;
+ listen_sock_callbacks(s, msgr);
return ret;
+
err:
- sock_release(sock);
+ ceph_socket_put(s, 0);
return ret;
}
/*
* accept a connection
*/
-int ceph_tcp_accept(struct socket *sock, struct ceph_connection *con)
+int ceph_tcp_accept(struct ceph_socket *ls, struct ceph_connection *con)
{
int ret;
struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
int len;
+ struct ceph_socket *s;
+
+ s = ceph_socket_create();
+ if (IS_ERR(s))
+ return PTR_ERR(s);
+ con->s = s;
+ s->sock->sk->sk_allocation = GFP_NOFS;
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
- if (ret < 0) {
- derr(0, "sock_create_kern error: %d\n", ret);
- goto done;
- }
-
- con->sock->sk->sk_allocation = GFP_NOFS;
-
- ret = sock->ops->accept(sock, con->sock, O_NONBLOCK);
- /* ret = kernel_accept(sock, &new_sock, sock->file->f_flags); */
+ ret = ls->sock->ops->accept(ls->sock, s->sock, O_NONBLOCK);
if (ret < 0) {
derr(0, "accept error: %d\n", ret);
goto err;
}
/* setup callbacks */
- set_sock_callbacks(con->sock, con);
+ set_sock_callbacks(s, con);
- con->sock->ops = sock->ops;
- con->sock->type = sock->type;
- ret = con->sock->ops->getname(con->sock, paddr, &len, 2);
+ s->sock->ops = ls->sock->ops;
+ s->sock->type = ls->sock->type;
+ ret = s->sock->ops->getname(s->sock, paddr, &len, 2);
if (ret < 0) {
derr(0, "getname error: %d\n", ret);
goto err;
}
-
-done:
return ret;
+
err:
- sock_release(con->sock);
+ ceph_socket_put(s, 0);
+ con->s = 0;
return ret;
}
/*
* receive a message this may return after partial send
*/
-int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
+int ceph_tcp_recvmsg(struct ceph_socket *s, void *buf, size_t len)
{
struct kvec iov = {buf, len};
struct msghdr msg = {.msg_flags = 0};
int rlen = 0; /* length read */
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
- rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
+ rlen = kernel_recvmsg(s->sock, &msg, &iov, 1, len, msg.msg_flags);
return(rlen);
}
/*
* Send a message this may return after partial send
*/
-int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
+int ceph_tcp_sendmsg(struct ceph_socket *s, struct kvec *iov,
size_t kvlen, size_t len, int more)
{
struct msghdr msg = {.msg_flags = 0};
msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
/*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
- rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
+ rlen = kernel_sendmsg(s->sock, &msg, iov, kvlen, len);
/*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
return(rlen);
}
extern struct workqueue_struct *recv_wq; /* receive work queue */
extern struct workqueue_struct *send_wq; /* send work queue */
+/* wrap socket, since we use/drop it from multiple threads */
+extern struct kobject *ceoh_sockets_kobj;
+
+struct ceph_socket {
+ struct kobject kobj;
+ struct socket *sock;
+};
+
/* prototype definitions */
-int ceph_tcp_connect(struct ceph_connection *);
+struct ceph_socket *ceph_tcp_connect(struct ceph_connection *);
int ceph_tcp_listen(struct ceph_messenger *);
-int ceph_tcp_accept(struct socket *, struct ceph_connection *);
-int ceph_tcp_recvmsg(struct socket *, void *, size_t );
-int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more);
-void ceph_cancel_sock_callbacks(struct socket *);
-void ceph_sock_release(struct socket *);
+int ceph_tcp_accept(struct ceph_socket *, struct ceph_connection *);
+int ceph_tcp_recvmsg(struct ceph_socket *, void *, size_t );
+int ceph_tcp_sendmsg(struct ceph_socket *, struct kvec *, size_t, size_t, int more);
+void ceph_cancel_sock_callbacks(struct ceph_socket *);
int ceph_workqueue_init(void);
void ceph_workqueue_shutdown(void);
-/* Well known port for ceph client listener.. */
-#define CEPH_PORT 2002
+extern struct ceph_socket *ceph_socket_create(void);
+extern void ceph_socket_get(struct ceph_socket *s);
+extern void ceph_socket_put(struct ceph_socket *s, int die);
+
/* Max number of outstanding connections in listener queueu */
#define NUM_BACKUP 10
#endif
+
+
/*
* connections
*/
dout(20, "put_connection destroying %p\n", con);
ceph_msg_put_list(&con->out_queue);
ceph_msg_put_list(&con->out_sent);
- ceph_sock_release(con->sock);
+ ceph_socket_put(con->s, 1);
kfree(con);
}
}
if (con->delay) {
dout(30, "fault tcp_close delay != 0\n");
- ceph_sock_release(con->sock);
- con->sock = NULL;
+ ceph_socket_put(con->s, 1);
+ con->s = NULL;
set_bit(NEW, &con->state);
/*
* 0 -> socket full, but more to do
* <0 -> error
*/
-static int write_partial_kvec(struct ceph_connection *con)
+static int write_partial_kvec(struct ceph_connection *con,
+ struct ceph_socket *s)
{
int ret;
dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
while (con->out_kvec_bytes > 0) {
- ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
+ ret = ceph_tcp_sendmsg(s, con->out_kvec_cur,
con->out_kvec_left, con->out_kvec_bytes,
con->out_more);
if (ret <= 0)
}
static int write_partial_msg_pages(struct ceph_connection *con,
+ struct ceph_socket *s,
struct ceph_msg *msg)
{
struct kvec kv;
kv.iov_base = kaddr + con->out_msg_pos.page_pos;
kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
- ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len, 1);
+ ret = ceph_tcp_sendmsg(s, &kv, 1, kv.iov_len, 1);
if (msg->pages)
kunmap(page);
mutex_unlock(&msg->page_mutex);
container_of(work, struct ceph_connection, swork.work);
struct ceph_messenger *msgr = con->msgr;
int ret = 1;
+ struct ceph_socket *s = con->s;
dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
atomic_read(&con->nref));
+ if (s)
+ ceph_socket_get(s);
if (test_bit(CLOSED, &con->state)) {
dout(5, "try_write closed\n");
con->in_tag = CEPH_MSGR_TAG_READY;
dout(5, "try_write initiating connect on %p new state %lu\n",
con, con->state);
- ret = ceph_tcp_connect(con);
- if (ret < 0) {
+ BUG_ON(s);
+ s = ceph_tcp_connect(con);
+ dout(10, "tcp_connect returned %p\n", s);
+ if (IS_ERR(s)) {
con->error_msg = "connect error";
ceph_fault(con);
goto done;
/* kvec data queued? */
more_kvec:
if (con->out_kvec_left) {
- ret = write_partial_kvec(con);
+ ret = write_partial_kvec(con, s);
if (ret == 0)
goto done;
if (ret < 0) {
/* msg pages? */
if (con->out_msg && con->out_msg->nr_pages) {
- ret = write_partial_msg_pages(con, con->out_msg);
+ ret = write_partial_msg_pages(con, s, con->out_msg);
if (ret == 1)
goto more_kvec;
if (ret == 0)
done:
dout(30, "try_write done on %p\n", con);
+ ceph_socket_put(s, 0);
put_connection(con);
return;
}
/*
* read (part of) a message
*/
-static int read_message_partial(struct ceph_connection *con)
+static int read_message_partial(struct ceph_connection *con,
+ struct ceph_socket *s)
{
struct ceph_msg *m = con->in_msg;
void *p;
/* header */
while (con->in_base_pos < sizeof(m->hdr)) {
left = sizeof(m->hdr) - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos,
+ ret = ceph_tcp_recvmsg(s, &m->hdr + con->in_base_pos,
left);
if (ret <= 0)
return ret;
return -ENOMEM;
}
left = front_len - m->front.iov_len;
- ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
+ ret = ceph_tcp_recvmsg(s, (char *)m->front.iov_base +
m->front.iov_len, left);
if (ret <= 0)
return ret;
return 0;
}
p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ ret = ceph_tcp_recvmsg(s, p + con->in_msg_pos.page_pos,
left);
kunmap(m->pages[con->in_msg_pos.page]);
mutex_unlock(&m->page_mutex);
/* footer */
while (con->in_base_pos < sizeof(m->hdr) + sizeof(m->footer)) {
left = sizeof(m->hdr) + sizeof(m->footer) - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, &m->footer +
+ ret = ceph_tcp_recvmsg(s, &m->footer +
(con->in_base_pos - sizeof(m->hdr)),
left);
if (ret <= 0)
/*
* read (part of) an ack
*/
-static int read_ack_partial(struct ceph_connection *con)
+static int read_ack_partial(struct ceph_connection *con,
+ struct ceph_socket *s)
{
while (con->in_base_pos < sizeof(con->in_partial_ack)) {
int left = sizeof(con->in_partial_ack) - con->in_base_pos;
- int ret = ceph_tcp_recvmsg(con->sock,
+ int ret = ceph_tcp_recvmsg(s,
(char *)&con->in_partial_ack +
con->in_base_pos, left);
if (ret <= 0)
/*
* read portion of connect-side handshake on a new connection
*/
-static int read_connect_partial(struct ceph_connection *con)
+static int read_connect_partial(struct ceph_connection *con,
+ struct ceph_socket *s)
{
int ret, to;
dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->actual_peer_addr + have,
+ ret = ceph_tcp_recvmsg(s, (char *)&con->actual_peer_addr + have,
left);
if (ret <= 0)
goto out;
/* in_tag */
to += 1;
if (con->in_base_pos < to) {
- ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
+ ret = ceph_tcp_recvmsg(s, &con->in_tag, 1);
if (ret <= 0)
goto out;
con->in_base_pos += ret;
if (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(con->in_connect_seq) - left;
- ret = ceph_tcp_recvmsg(con->sock,
+ ret = ceph_tcp_recvmsg(s,
(char *)&con->in_connect_seq +
have, left);
if (ret <= 0)
case CEPH_MSGR_TAG_WAIT:
dout(10, "process_connect peer connecting WAIT\n");
set_bit(WAIT, &con->state);
- ceph_sock_release(con->sock);
- con->sock = NULL;
+ ceph_socket_put(con->s, 1);
+ con->s = NULL;
break;
case CEPH_MSGR_TAG_READY:
dout(10, "process_connect got READY, now open\n");
/*
* read portion of accept-side handshake on a newly accepted connection
*/
-static int read_accept_partial(struct ceph_connection *con)
+static int read_accept_partial(struct ceph_connection *con,
+ struct ceph_socket *s)
{
int ret;
int to;
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock,
+ ret = ceph_tcp_recvmsg(s,
(char *)&con->peer_addr + have, left);
if (ret <= 0)
return ret;
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(con->peer_addr) - left;
- ret = ceph_tcp_recvmsg(con->sock,
+ ret = ceph_tcp_recvmsg(s,
(char *)&con->in_connect_seq + have,
left);
if (ret <= 0)
*/
static void try_read(struct work_struct *work)
{
- int ret = -1;
- struct ceph_connection *con;
+ struct ceph_connection *con = container_of(work, struct ceph_connection,
+ rwork);
struct ceph_messenger *msgr;
+ struct ceph_socket *s;
+ int ret = -1;
- con = container_of(work, struct ceph_connection, rwork);
dout(20, "try_read start on %p\n", con);
+ s = con->s;
+ if (!s)
+ goto out;
+ ceph_socket_get(s);
msgr = con->msgr;
retry:
}
if (test_bit(ACCEPTING, &con->state)) {
dout(20, "try_read accepting\n");
- ret = read_accept_partial(con);
+ ret = read_accept_partial(con, s);
if (ret <= 0)
goto done;
process_accept(con); /* accepted */
}
if (test_bit(CONNECTING, &con->state)) {
dout(20, "try_read connecting\n");
- ret = read_connect_partial(con);
+ ret = read_connect_partial(con, s);
if (ret <= 0)
goto done;
process_connect(con);
static char buf[1024];
int skip = min(1024, -con->in_base_pos);
dout(20, "skipping %d / %d bytes\n", skip, -con->in_base_pos);
- ret = ceph_tcp_recvmsg(con->sock, buf, skip);
+ ret = ceph_tcp_recvmsg(s, buf, skip);
if (ret <= 0)
goto done;
con->in_base_pos += ret;
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_READY) {
- ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
+ ret = ceph_tcp_recvmsg(s, &con->in_tag, 1);
if (ret <= 0)
goto done;
dout(30, "try_read got tag %d\n", (int)con->in_tag);
}
}
if (con->in_tag == CEPH_MSGR_TAG_MSG) {
- ret = read_message_partial(con);
+ ret = read_message_partial(con, s);
if (ret <= 0)
goto done;
if (con->in_tag == CEPH_MSGR_TAG_READY)
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_ACK) {
- ret = read_ack_partial(con);
+ ret = read_ack_partial(con, s);
if (ret <= 0)
goto done;
process_ack(con);
out:
dout(20, "try_read done on %p\n", con);
+ ceph_socket_put(s, 0);
put_connection(con);
return;
}
clear_bit(NEW, &new_con->state);
new_con->in_tag = CEPH_MSGR_TAG_READY; /* eventually, hopefully */
- if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
+ if (ceph_tcp_accept(msgr->listen_s, new_con) < 0) {
derr(1, "error accepting connection\n");
put_connection(new_con);
goto done;
dout(1, "messenger %p listening on %u.%u.%u.%u:%u\n", msgr,
IPQUADPORT(msgr->inst.addr.ipaddr));
+
return msgr;
}
dout(2, "destroy %p\n", msgr);
/* stop listener */
- ceph_cancel_sock_callbacks(msgr->listen_sock);
+ ceph_socket_put(msgr->listen_s, 1);
cancel_work_sync(&msgr->awork);
- ceph_sock_release(msgr->listen_sock);
/* kill off connections */
spin_lock(&msgr->con_lock);
/* in case there's queued work... */
spin_unlock(&msgr->con_lock);
- ceph_cancel_sock_callbacks(con->sock);
+ ceph_cancel_sock_callbacks(con->s);
cancel_work_sync(&con->rwork);
cancel_delayed_work_sync(&con->swork);
put_connection(con);
#ifndef __FS_CEPH_MESSENGER_H
#define __FS_CEPH_MESSENGER_H
+#include <linux/kobject.h>
#include <linux/mutex.h>
#include <linux/net.h>
#include <linux/radix-tree.h>
ceph_msgr_peer_reset_t peer_reset;
ceph_msgr_prepare_pages_t prepare_pages;
struct ceph_entity_inst inst; /* my name+address */
- struct socket *listen_sock; /* listening socket */
+ struct ceph_socket *listen_s; /* listening socket */
struct work_struct awork; /* accept work */
spinlock_t con_lock;
struct list_head con_all; /* all connections */
#define BASE_DELAY_INTERVAL (HZ/2)
#define MAX_DELAY_INTERVAL (5U * 60 * HZ)
+
/* ceph_connection state bit flags */
#define NEW 0
#define CONNECTING 1
struct ceph_connection {
struct ceph_messenger *msgr;
- struct socket *sock; /* connection socket */
+ struct ceph_socket *s; /* connection socket */
unsigned long state; /* connection state */
const char *error_msg;