From 131231fb36885a91d1ec14e01a4caca721bbec1b Mon Sep 17 00:00:00 2001 From: patiencew Date: Thu, 22 Nov 2007 01:07:22 +0000 Subject: [PATCH] added socket callbacks, worker threads etc.. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2106 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/ktcp.c | 235 ++++++++++++++++++++++++++-------- trunk/ceph/kernel/ktcp.h | 8 +- trunk/ceph/kernel/messenger.c | 109 ++++++---------- trunk/ceph/kernel/messenger.h | 19 ++- 4 files changed, 238 insertions(+), 133 deletions(-) diff --git a/trunk/ceph/kernel/ktcp.c b/trunk/ceph/kernel/ktcp.c index 9bf03272e1403..d6bb68e1391b8 100644 --- a/trunk/ceph/kernel/ktcp.c +++ b/trunk/ceph/kernel/ktcp.c @@ -5,102 +5,190 @@ #include "messenger.h" #include "ktcp.h" +static struct workqueue_struct *recv_wq; /* receive work queue */ +static struct workqueue_struct *send_wq; /* send work queue */ -struct socket * _kconnect(struct sockaddr *saddr) +/* + * socket callback functions + */ +/* Data available on socket or listen socket received a connect */ +static void ceph_data_ready(struct sock *sk, int count_unused) +{ + struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; + + printk(KERN_INFO "Entered ceph_data_ready state = %u\n", con->state); + queue_work(recv_wq, &con->rwork); +} + +/* socket has bufferspace for writing */ +static void ceph_write_space(struct sock *sk) +{ + struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; + + printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state); + if (test_bit(WRITE_PEND, &con->state)) { + printk(KERN_INFO "WRITE_PEND set in connection\n"); + queue_work(send_wq, &con->swork); + } +} + +/* sockets state has change */ +static void ceph_state_change(struct sock *sk) +{ + struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; + /* TBD: probably want to set our connection state to OPEN + * if state not set to READ or WRITE pending + */ + printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state); + if (sk->sk_state == TCP_ESTABLISHED) { + if (test_and_clear_bit(CONNECTING, &con->state)) + set_bit(OPEN, &con->state); + ceph_write_space(sk); + } +} + +/* make a socket active by setting up the call back functions */ +int add_sock_callbacks(struct socket *sock, struct ceph_connection *con) +{ + struct sock *sk = sock->sk; + sk->sk_user_data = con; + printk(KERN_INFO "Entered add_sock_callbacks\n"); + + /* Install callbacks */ + sk->sk_data_ready = ceph_data_ready; + sk->sk_write_space = ceph_write_space; + sk->sk_state_change = ceph_state_change; + + return 0; +} +/* + * initiate connection to a remote socket. + */ +int _kconnect(struct ceph_connection *con) { int ret; - struct socket *sd = NULL; + struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; -/* TBD: somewhere check for a connection already established to this node? */ -/* if we are keeping connections alive for a period of time */ - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd); + set_bit(CONNECTING, &con->state); + + ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); if (ret < 0) { - printk(KERN_INFO "sock_create_kern error: %d\n", ret); - } else { - /* or could call kernel_connect(), opted to reduce call overhead */ - ret = sd->ops->connect(sd, (struct sockaddr *) saddr, - sizeof (struct sockaddr_in),0); - if (ret < 0) { - printk(KERN_INFO "kernel_connect error: %d\n", ret); - sock_release(sd); - } - } - return(sd); + printk(KERN_INFO "sock_create_kern error: %d\n", ret); + goto done; + } + + /* setup callbacks */ + add_sock_callbacks(con->sock, con); + + + ret = con->sock->ops->connect(con->sock, paddr, + sizeof(struct sockaddr_in), O_NONBLOCK); + if (ret == -EINPROGRESS) return 0; + if (ret < 0) { + /* TBD check for fatal errors, retry if not fatal.. */ + printk(KERN_INFO "kernel_connect error: %d\n", ret); + sock_release(con->sock); + con->sock = NULL; + } +done: + return ret; } -struct socket * _klisten(struct sockaddr_in *in_addr) +int _klisten(struct ceph_messenger *msgr) { int ret; - struct socket *sd = NULL; + struct socket *sock = NULL; + int optval = 1; + struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr; - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd); + ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret < 0) { printk(KERN_INFO "sock_create_kern error: %d\n", ret); - return ERR_PTR(ret); + return ret; } + ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&optval, sizeof(optval)); + if (ret < 0) { + printk("Failed to set SO_REUSEADDR: %d\n", ret); + goto err; + } + + /* set user_data to be the messenger */ + sock->sk->sk_user_data = msgr; /* no user specified address given so create, will allow arg to mount */ - if (!in_addr->sin_addr.s_addr) { - in_addr->sin_family = AF_INET; - in_addr->sin_addr.s_addr = htonl(INADDR_ANY); - in_addr->sin_port = htons(CEPH_PORT); /* known port for now */ - /* in_addr->sin_port = htons(0); */ /* any port */ + myaddr->sin_family = AF_INET; + myaddr->sin_addr.s_addr = htonl(INADDR_ANY); + myaddr->sin_port = htons(CEPH_PORT); /* known port for now */ + /* myaddr->sin_port = htons(0); */ /* any port */ + ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, + sizeof(struct sockaddr_in)); + if (ret < 0) { + printk("Failed to bind to port %d\n", ret); + goto err; } -/* TBD: set sock options... */ - /* ret = kernel_setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, - (char *)optval, optlen); + ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + (char *)&optval, sizeof(optval)); if (ret < 0) { - printk("Failed to set SO_REUSEADDR: %d\n", ret); - } */ - ret = sd->ops->bind(sd, (struct sockaddr*)in_addr, sizeof(in_addr)); -/* TBD: probaby want to tune the backlog queue .. */ - ret = sd->ops->listen(sd, NUM_BACKUP); + printk("Failed to set SO_KEEPALIVE: %d\n", ret); + goto err; + } + + msgr->listen_sock = sock; + + /* TBD: probaby want to tune the backlog queue .. */ + ret = sock->ops->listen(sock, NUM_BACKUP); if (ret < 0) { printk(KERN_INFO "kernel_listen error: %d\n", ret); - sock_release(sd); - sd = NULL; + msgr->listen_sock = NULL; + goto err; } - return(sd); + return ret; +err: + sock_release(sock); + return ret; } /* * Note: Maybe don't need this, or make inline... keep for now for debugging.. * we may need to add more functionality */ -struct socket *_kaccept(struct socket *sd) +struct socket *_kaccept(struct socket *sock) { - struct socket *new_sd = NULL; + struct socket *new_sock = NULL; int ret; - - ret = kernel_accept(sd, &new_sd, sd->file->f_flags); + + ret = kernel_accept(sock, &new_sock, sock->file->f_flags); if (ret < 0) { printk(KERN_INFO "kernel_accept error: %d\n", ret); - return(new_sd); + return(new_sock); } /* TBD: shall we check name for validity? */ - return(new_sd); + return(new_sock); } /* * receive a message this may return after partial send */ -int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags) +int _krecvmsg(struct socket *sock, void *buf, size_t len) { struct kvec iov = {buf, len}; - struct msghdr msg = {.msg_flags = msgflags}; + struct msghdr msg = {.msg_flags = 0}; int rlen = 0; /* length read */ printk(KERN_INFO "entered krevmsg\n"); msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; /* receive one kvec for now... */ - rlen = kernel_recvmsg(sd, &msg, &iov, 1, len, msg.msg_flags); + rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); if (rlen < 0) { printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen); } + /* TBD: kernel_recvmsg doesn't fill in the name and namelen + */ return(rlen); } @@ -108,32 +196,77 @@ int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags) /* * Send a message this may return after partial send */ -int _ksendmsg(struct socket *sd, struct kvec *iov, - size_t kvlen, size_t len, unsigned msgflags) +int _ksendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len) { - struct msghdr msg = {.msg_flags = msgflags}; + struct msghdr msg = {.msg_flags = 0}; int rlen = 0; printk(KERN_INFO "entered ksendmsg\n"); msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; - rlen = kernel_sendmsg(sd, &msg, iov, kvlen, len); + rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len); if (rlen < 0) { printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen); } return(rlen); } -struct sockaddr *_kgetname(struct socket *sd) +struct sockaddr *_kgetname(struct socket *sock) { struct sockaddr *saddr = NULL; int len; int ret; - if ((ret = sd->ops->getname(sd, (struct sockaddr *)saddr, + if ((ret = sock->ops->getname(sock, (struct sockaddr *)saddr, &len, 2) < 0)) { printk(KERN_INFO "kernel getname error: %d\n", ret); } return(saddr); } +/* + * workqueue initialization + */ + +int work_init(void) +{ + int ret = 0; + + printk(KERN_INFO "entered work_init\n"); + /* + * Create a num CPU threads to handle receive requests + * note: we can create more threads if needed to even out + * the scheduling of multiple requests.. + */ + recv_wq = create_workqueue("ceph-recv"); + ret = IS_ERR(recv_wq); + if (ret) { + printk(KERN_INFO "receive worker failed to start: %d\n", ret); + destroy_workqueue(recv_wq); + return ret; + } + + /* + * Create a single thread to handle send requests + * note: may use same thread pool as receive workers later... + */ + send_wq = create_singlethread_workqueue("ceph-send"); + ret = IS_ERR(send_wq); + if (ret) { + printk(KERN_INFO "send worker failed to start: %d\n", ret); + destroy_workqueue(send_wq); + return ret; + } + printk(KERN_INFO "successfully created wrkqueues\n"); + + return(ret); +} + +/* + * workqueue shutdown + */ +void shutdown_workqueues(void) +{ + destroy_workqueue(send_wq); + destroy_workqueue(recv_wq); +} diff --git a/trunk/ceph/kernel/ktcp.h b/trunk/ceph/kernel/ktcp.h index 279a56f090d42..0a16444207e77 100644 --- a/trunk/ceph/kernel/ktcp.h +++ b/trunk/ceph/kernel/ktcp.h @@ -2,11 +2,11 @@ #define _FS_CEPH_TCP_H /* prototype definitions */ -struct socket * _kconnect(struct sockaddr *); -struct socket * _klisten(struct sockaddr_in *); +int _kconnect(struct ceph_connection *); +int _klisten(struct ceph_messenger *); struct socket *_kaccept(struct socket *); -int _krecvmsg(struct socket *, void *, size_t , unsigned); -int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned); +int _krecvmsg(struct socket *, void *, size_t ); +int _ksendmsg(struct socket *, struct kvec *, size_t, size_t); /* Well known port for ceph client listener.. */ #define CEPH_PORT 2002 diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 8c43d1c060727..f4fd71d3b5e0f 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -61,6 +61,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) con->msgr = msgr; spin_lock_init(&con->con_lock); + set_bit(NEW, &con->state); INIT_WORK(&con->rwork, try_read); /* setup work structure */ INIT_WORK(&con->swork, try_write); /* setup work structure */ @@ -162,8 +163,8 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio spin_lock(&msgr->con_lock); list_del(&con->list_all); - if (con->state == CONNECTING || - con->state == OPEN) { + if (test_bit(CONNECTING, &con->state) || + test_bit(OPEN, &con->state)) { /* remove from con_open too */ if (list_empty(&con->list_bucket)) { /* last one */ @@ -190,30 +191,6 @@ static void replace_connection(struct ceph_messenger *msgr, struct ceph_connecti spin_unlock(&msgr->con_lock); put_connection(old); /* dec reference count */ } - - -/* - * initiate connection to a remote socket. - * - * TBD: make this async, somehow! - */ -static int do_connect(struct ceph_connection *con) -{ - struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; - - dout(1, "do_connect on %p\n", con); - con->sock = _kconnect(paddr); - if (IS_ERR(con->sock)) { - con->sock = 0; - return PTR_ERR(con->sock); - } - - /* setup callbacks */ - - - return 0; -} - /* * non-blocking versions @@ -232,7 +209,7 @@ static int write_partial_kvec(struct ceph_connection *con) int ret; while (con->out_kvec_bytes > 0) { - ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes, 0); + ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes); if (ret < 0) return ret; /* error */ if (ret == 0) return 0; /* socket full */ con->out_kvec_bytes -= ret; @@ -263,7 +240,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos; kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), (int)(msg->hdr.data_len - con->out_msg_pos.data_pos)); - ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len, 0); + ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len); if (ret < 0) return ret; if (ret == 0) return 0; /* socket full */ con->out_msg_pos.data_pos += ret; @@ -374,10 +351,10 @@ more: if (ret == 0) goto done; - if (con->state == REJECTING) { + if (test_bit(REJECTING, &con->state)) { /* FIXME do something else here, pbly? */ remove_connection(msgr, con); - con->state = CLOSED; + set_bit(CLOSED, &con->state); put_connection(con); } @@ -440,7 +417,7 @@ static int read_message_partial(struct ceph_connection *con) /* header */ while (con->in_base_pos < sizeof(struct ceph_msg_header)) { left = sizeof(struct ceph_msg_header) - con->in_base_pos; - ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0); + ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left); if (ret <= 0) return ret; con->in_base_pos += ret; if (con->in_base_pos == sizeof(struct ceph_msg_header)) { @@ -458,7 +435,7 @@ static int read_message_partial(struct ceph_connection *con) return -ENOMEM; } left = m->hdr.front_len - m->front.iov_len; - ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left, 0); + ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left); if (ret <= 0) return ret; m->front.iov_len += ret; } @@ -480,7 +457,7 @@ static int read_message_partial(struct ceph_connection *con) left = min((int)(m->hdr.data_len - con->in_msg_pos.data_pos), (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); p = kmap(m->pages[con->in_msg_pos.page]); - ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left, 0); + ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left); if (ret <= 0) return ret; con->in_msg_pos.data_pos += ret; con->in_msg_pos.page_pos += ret; @@ -511,7 +488,7 @@ static int read_ack_partial(struct ceph_connection *con) { while (con->in_base_pos < sizeof(con->in_partial_ack)) { int left = sizeof(con->in_partial_ack) - con->in_base_pos; - int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left, 0); + int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -541,7 +518,7 @@ static int read_accept_partial(struct ceph_connection *con) /* peer addr */ while (con->in_base_pos < sizeof(con->peer_addr)) { int left = sizeof(con->peer_addr) - con->in_base_pos; - ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left, 0); + ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -550,7 +527,7 @@ static int read_accept_partial(struct ceph_connection *con) while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) { int off = con->in_base_pos - sizeof(con->peer_addr); int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos; - ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left,0); + ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -569,30 +546,32 @@ static void process_accept(struct ceph_connection *con) existing = get_connection(con->msgr, &con->peer_addr); if (existing) { spin_lock(&existing->con_lock); - if ((existing->state == CONNECTING && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) || - (existing->state == OPEN && con->connect_seq == existing->connect_seq)) { + if ((test_bit(CONNECTING, &existing->state) && + compare_addr(&con->msgr->inst.addr, &con->peer_addr)) || + (test_bit(OPEN, &existing->state) && + con->connect_seq == existing->connect_seq)) { /* replace existing with new connection */ replace_connection(con->msgr, existing, con); /* steal message queue */ list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */ con->out_seq = existing->out_seq; - con->state = OPEN; - existing->state = CLOSED; + set_bit(OPEN, &con->state); + set_bit(CLOSED, &existing->state); } else { /* reject new connection */ - con->state = REJECTING; + set_bit(REJECTING, &con->state); con->connect_seq = existing->connect_seq; /* send this with the reject */ } spin_unlock(&existing->con_lock); put_connection(existing); } else { add_connection(con->msgr, con); - con->state = OPEN; + set_bit(OPEN, &con->state); } spin_unlock(&con->msgr->con_lock); /* the result? */ - if (con->state == REJECTING) + if (test_bit(REJECTING, &con->state)) prepare_write_accept_reject(con); else prepare_write_accept_ready(con); @@ -615,13 +594,10 @@ more: /* * TBD: maybe store error in ceph_connection */ - /* if (con->state == CLOSED) return -1; */ if (test_bit(CLOSED, &con->state)) goto done; if (test_bit(ACCEPTING, &con->state)) { ret = read_accept_partial(con); - /* TBD: do something with error */ - /* if (ret <= 0) return ret; */ if (ret <= 0) goto done; /* accepted */ process_accept(con); @@ -629,9 +605,7 @@ more: } if (con->in_tag == CEPH_MSGR_TAG_READY) { - ret = _krecvmsg(con->sock, &con->in_tag, 1, 0); - /* if (ret <= 0) return ret; */ - /* TBD: do something with error */ + ret = _krecvmsg(con->sock, &con->in_tag, 1); if (ret <= 0) goto done; if (con->in_tag == CEPH_MSGR_TAG_MSG) prepare_read_message(con); @@ -645,8 +619,6 @@ more: } if (con->in_tag == CEPH_MSGR_TAG_MSG) { ret = read_message_partial(con); - /* if (ret <= 0) return ret; */ - /* TBD: do something with error */ if (ret <= 0) goto done; /* got a full message! */ msgr->dispatch(con->msgr->parent, con->in_msg); @@ -657,8 +629,6 @@ more: } if (con->in_tag == CEPH_MSGR_TAG_ACK) { ret = read_ack_partial(con); - /* if (ret <= 0) return ret; */ - /* TBD: do something with error */ if (ret <= 0) goto done; /* got an ack */ process_ack(con, con->in_partial_ack); @@ -668,6 +638,7 @@ more: bad: BUG_ON(1); /* shouldn't get here */ done: + con->error = ret; return; } @@ -733,7 +704,7 @@ done: struct ceph_messenger *ceph_messenger_create() { struct ceph_messenger *msgr; - struct sockaddr_in saddr; + int ret = 0; msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); if (msgr == NULL) @@ -742,18 +713,12 @@ struct ceph_messenger *ceph_messenger_create() spin_lock_init(&msgr->con_lock); /* create listening socket */ - msgr->listen_sock = _klisten(&saddr); - if (IS_ERR(msgr->listen_sock)) { - int err = PTR_ERR(msgr->listen_sock); + ret = _klisten(msgr); + if(ret < 0) { kfree(msgr); - return ERR_PTR(err); + return ERR_PTR(ret); } - /* determine my ip:port */ - msgr->inst.addr.ipaddr.sin_family = saddr.sin_family; - msgr->inst.addr.ipaddr.sin_port = saddr.sin_port; - msgr->inst.addr.ipaddr.sin_addr = saddr.sin_addr; - /* TBD: setup callback for accept */ INIT_WORK(&msgr->awork, try_accept); /* setup work structure */ @@ -772,6 +737,7 @@ struct ceph_messenger *ceph_messenger_create() int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) { struct ceph_connection *con; + int ret = 0; /* set source */ msg->hdr.src = msgr->inst; @@ -787,7 +753,6 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohl(msg->hdr.dst.addr.ipaddr.sin_port)); con->peer_addr = msg->hdr.dst.addr; - con->state = CONNECTING; add_connection(msgr, con); } else { dout(5, "had connection to peer %x:%d\n", @@ -799,8 +764,18 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) spin_lock(&con->con_lock); /* initiate connect? */ - if (con->state == CONNECTING) - do_connect(con); + if (test_bit(NEW, &con->state)) { + ret = _kconnect(con); + if (ret < 0){ + derr(1, "connection failure to peer %x:%d\n", + ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), + ntohl(msg->hdr.dst.addr.ipaddr.sin_port)); + remove_connection(msgr, con); + kfree(con); + return(ret); + + } + } /* queue */ dout(1, "queuing outgoing message for %s.%d\n", @@ -810,7 +785,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) spin_unlock(&con->con_lock); put_connection(con); - return 0; + return ret; } diff --git a/trunk/ceph/kernel/messenger.h b/trunk/ceph/kernel/messenger.h index 1aacd77724c0f..b2048a47b4b50 100644 --- a/trunk/ceph/kernel/messenger.h +++ b/trunk/ceph/kernel/messenger.h @@ -52,20 +52,18 @@ struct ceph_msg_pos { /* current state of connection */ -enum ceph_connection_state { - NEW = 1, - ACCEPTING = 2, - CONNECTING = 4, - OPEN = 8, - REJECTING = 16, - CLOSED = 32, - READ_PEND = 64, - WRITE_PEND = 128 -}; +#define NEW 1 +#define CONNECTING 2 +#define ACCEPTING 3 +#define OPEN 4 +#define WRITE_PEND 5 +#define REJECTING 6 +#define CLOSED 7 struct ceph_connection { struct ceph_messenger *msgr; struct socket *sock; /* connection socket */ + __u32 state; /* connection state */ atomic_t nref; spinlock_t con_lock; /* connection lock */ @@ -74,7 +72,6 @@ struct ceph_connection { struct list_head list_bucket; /* msgr->con_open or con_accepting */ struct ceph_entity_addr peer_addr; /* peer address */ - enum ceph_connection_state state; __u32 connect_seq; __u32 out_seq; /* last message queued for send */ __u32 in_seq, in_seq_acked; /* last message received, acked */ -- 2.39.5