#include "messenger.h"
#include "ktcp.h"
+static struct workqueue_struct *recv_wq; /* receive work queue */
+static struct workqueue_struct *send_wq; /* send work queue */
-struct socket * _kconnect(struct sockaddr *saddr)
+/*
+ * socket callback functions
+ */
+/* Data available on socket or listen socket received a connect */
+static void ceph_data_ready(struct sock *sk, int count_unused)
+{
+ struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+
+ printk(KERN_INFO "Entered ceph_data_ready state = %u\n", con->state);
+ queue_work(recv_wq, &con->rwork);
+}
+
+/* socket has bufferspace for writing */
+static void ceph_write_space(struct sock *sk)
+{
+ struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+
+ printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
+ if (test_bit(WRITE_PEND, &con->state)) {
+ printk(KERN_INFO "WRITE_PEND set in connection\n");
+ queue_work(send_wq, &con->swork);
+ }
+}
+
+/* sockets state has change */
+static void ceph_state_change(struct sock *sk)
+{
+ struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+ /* TBD: probably want to set our connection state to OPEN
+ * if state not set to READ or WRITE pending
+ */
+ printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
+ if (sk->sk_state == TCP_ESTABLISHED) {
+ if (test_and_clear_bit(CONNECTING, &con->state))
+ set_bit(OPEN, &con->state);
+ ceph_write_space(sk);
+ }
+}
+
+/* make a socket active by setting up the call back functions */
+int add_sock_callbacks(struct socket *sock, struct ceph_connection *con)
+{
+ struct sock *sk = sock->sk;
+ sk->sk_user_data = con;
+ printk(KERN_INFO "Entered add_sock_callbacks\n");
+
+ /* Install callbacks */
+ sk->sk_data_ready = ceph_data_ready;
+ sk->sk_write_space = ceph_write_space;
+ sk->sk_state_change = ceph_state_change;
+
+ return 0;
+}
+/*
+ * initiate connection to a remote socket.
+ */
+int _kconnect(struct ceph_connection *con)
{
int ret;
- struct socket *sd = NULL;
+ struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
-/* TBD: somewhere check for a connection already established to this node? */
-/* if we are keeping connections alive for a period of time */
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
+ set_bit(CONNECTING, &con->state);
+
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
if (ret < 0) {
- printk(KERN_INFO "sock_create_kern error: %d\n", ret);
- } else {
- /* or could call kernel_connect(), opted to reduce call overhead */
- ret = sd->ops->connect(sd, (struct sockaddr *) saddr,
- sizeof (struct sockaddr_in),0);
- if (ret < 0) {
- printk(KERN_INFO "kernel_connect error: %d\n", ret);
- sock_release(sd);
- }
- }
- return(sd);
+ printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+ goto done;
+ }
+
+ /* setup callbacks */
+ add_sock_callbacks(con->sock, con);
+
+
+ ret = con->sock->ops->connect(con->sock, paddr,
+ sizeof(struct sockaddr_in), O_NONBLOCK);
+ if (ret == -EINPROGRESS) return 0;
+ if (ret < 0) {
+ /* TBD check for fatal errors, retry if not fatal.. */
+ printk(KERN_INFO "kernel_connect error: %d\n", ret);
+ sock_release(con->sock);
+ con->sock = NULL;
+ }
+done:
+ return ret;
}
-struct socket * _klisten(struct sockaddr_in *in_addr)
+int _klisten(struct ceph_messenger *msgr)
{
int ret;
- struct socket *sd = NULL;
+ struct socket *sock = NULL;
+ int optval = 1;
+ struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret < 0) {
printk(KERN_INFO "sock_create_kern error: %d\n", ret);
- return ERR_PTR(ret);
+ return ret;
}
+ ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&optval, sizeof(optval));
+ if (ret < 0) {
+ printk("Failed to set SO_REUSEADDR: %d\n", ret);
+ goto err;
+ }
+
+ /* set user_data to be the messenger */
+ sock->sk->sk_user_data = msgr;
/* no user specified address given so create, will allow arg to mount */
- if (!in_addr->sin_addr.s_addr) {
- in_addr->sin_family = AF_INET;
- in_addr->sin_addr.s_addr = htonl(INADDR_ANY);
- in_addr->sin_port = htons(CEPH_PORT); /* known port for now */
- /* in_addr->sin_port = htons(0); */ /* any port */
+ myaddr->sin_family = AF_INET;
+ myaddr->sin_addr.s_addr = htonl(INADDR_ANY);
+ myaddr->sin_port = htons(CEPH_PORT); /* known port for now */
+ /* myaddr->sin_port = htons(0); */ /* any port */
+ ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
+ sizeof(struct sockaddr_in));
+ if (ret < 0) {
+ printk("Failed to bind to port %d\n", ret);
+ goto err;
}
-/* TBD: set sock options... */
- /* ret = kernel_setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
- (char *)optval, optlen);
+ ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+ (char *)&optval, sizeof(optval));
if (ret < 0) {
- printk("Failed to set SO_REUSEADDR: %d\n", ret);
- } */
- ret = sd->ops->bind(sd, (struct sockaddr*)in_addr, sizeof(in_addr));
-/* TBD: probaby want to tune the backlog queue .. */
- ret = sd->ops->listen(sd, NUM_BACKUP);
+ printk("Failed to set SO_KEEPALIVE: %d\n", ret);
+ goto err;
+ }
+
+ msgr->listen_sock = sock;
+
+ /* TBD: probaby want to tune the backlog queue .. */
+ ret = sock->ops->listen(sock, NUM_BACKUP);
if (ret < 0) {
printk(KERN_INFO "kernel_listen error: %d\n", ret);
- sock_release(sd);
- sd = NULL;
+ msgr->listen_sock = NULL;
+ goto err;
}
- return(sd);
+ return ret;
+err:
+ sock_release(sock);
+ return ret;
}
/*
* Note: Maybe don't need this, or make inline... keep for now for debugging..
* we may need to add more functionality
*/
-struct socket *_kaccept(struct socket *sd)
+struct socket *_kaccept(struct socket *sock)
{
- struct socket *new_sd = NULL;
+ struct socket *new_sock = NULL;
int ret;
-
- ret = kernel_accept(sd, &new_sd, sd->file->f_flags);
+
+ ret = kernel_accept(sock, &new_sock, sock->file->f_flags);
if (ret < 0) {
printk(KERN_INFO "kernel_accept error: %d\n", ret);
- return(new_sd);
+ return(new_sock);
}
/* TBD: shall we check name for validity? */
- return(new_sd);
+ return(new_sock);
}
/*
* receive a message this may return after partial send
*/
-int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags)
+int _krecvmsg(struct socket *sock, void *buf, size_t len)
{
struct kvec iov = {buf, len};
- struct msghdr msg = {.msg_flags = msgflags};
+ struct msghdr msg = {.msg_flags = 0};
int rlen = 0; /* length read */
printk(KERN_INFO "entered krevmsg\n");
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
/* receive one kvec for now... */
- rlen = kernel_recvmsg(sd, &msg, &iov, 1, len, msg.msg_flags);
+ rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
if (rlen < 0) {
printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen);
}
+ /* TBD: kernel_recvmsg doesn't fill in the name and namelen
+ */
return(rlen);
}
/*
* Send a message this may return after partial send
*/
-int _ksendmsg(struct socket *sd, struct kvec *iov,
- size_t kvlen, size_t len, unsigned msgflags)
+int _ksendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len)
{
- struct msghdr msg = {.msg_flags = msgflags};
+ struct msghdr msg = {.msg_flags = 0};
int rlen = 0;
printk(KERN_INFO "entered ksendmsg\n");
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
- rlen = kernel_sendmsg(sd, &msg, iov, kvlen, len);
+ rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
if (rlen < 0) {
printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen);
}
return(rlen);
}
-struct sockaddr *_kgetname(struct socket *sd)
+struct sockaddr *_kgetname(struct socket *sock)
{
struct sockaddr *saddr = NULL;
int len;
int ret;
- if ((ret = sd->ops->getname(sd, (struct sockaddr *)saddr,
+ if ((ret = sock->ops->getname(sock, (struct sockaddr *)saddr,
&len, 2) < 0)) {
printk(KERN_INFO "kernel getname error: %d\n", ret);
}
return(saddr);
}
+/*
+ * workqueue initialization
+ */
+
+int work_init(void)
+{
+ int ret = 0;
+
+ printk(KERN_INFO "entered work_init\n");
+ /*
+ * Create a num CPU threads to handle receive requests
+ * note: we can create more threads if needed to even out
+ * the scheduling of multiple requests..
+ */
+ recv_wq = create_workqueue("ceph-recv");
+ ret = IS_ERR(recv_wq);
+ if (ret) {
+ printk(KERN_INFO "receive worker failed to start: %d\n", ret);
+ destroy_workqueue(recv_wq);
+ return ret;
+ }
+
+ /*
+ * Create a single thread to handle send requests
+ * note: may use same thread pool as receive workers later...
+ */
+ send_wq = create_singlethread_workqueue("ceph-send");
+ ret = IS_ERR(send_wq);
+ if (ret) {
+ printk(KERN_INFO "send worker failed to start: %d\n", ret);
+ destroy_workqueue(send_wq);
+ return ret;
+ }
+ printk(KERN_INFO "successfully created wrkqueues\n");
+
+ return(ret);
+}
+
+/*
+ * workqueue shutdown
+ */
+void shutdown_workqueues(void)
+{
+ destroy_workqueue(send_wq);
+ destroy_workqueue(recv_wq);
+}
#define _FS_CEPH_TCP_H
/* prototype definitions */
-struct socket * _kconnect(struct sockaddr *);
-struct socket * _klisten(struct sockaddr_in *);
+int _kconnect(struct ceph_connection *);
+int _klisten(struct ceph_messenger *);
struct socket *_kaccept(struct socket *);
-int _krecvmsg(struct socket *, void *, size_t , unsigned);
-int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned);
+int _krecvmsg(struct socket *, void *, size_t );
+int _ksendmsg(struct socket *, struct kvec *, size_t, size_t);
/* Well known port for ceph client listener.. */
#define CEPH_PORT 2002
con->msgr = msgr;
spin_lock_init(&con->con_lock);
+ set_bit(NEW, &con->state);
INIT_WORK(&con->rwork, try_read); /* setup work structure */
INIT_WORK(&con->swork, try_write); /* setup work structure */
spin_lock(&msgr->con_lock);
list_del(&con->list_all);
- if (con->state == CONNECTING ||
- con->state == OPEN) {
+ if (test_bit(CONNECTING, &con->state) ||
+ test_bit(OPEN, &con->state)) {
/* remove from con_open too */
if (list_empty(&con->list_bucket)) {
/* last one */
spin_unlock(&msgr->con_lock);
put_connection(old); /* dec reference count */
}
-
-
-/*
- * initiate connection to a remote socket.
- *
- * TBD: make this async, somehow!
- */
-static int do_connect(struct ceph_connection *con)
-{
- struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
-
- dout(1, "do_connect on %p\n", con);
- con->sock = _kconnect(paddr);
- if (IS_ERR(con->sock)) {
- con->sock = 0;
- return PTR_ERR(con->sock);
- }
-
- /* setup callbacks */
-
-
- return 0;
-}
-
/*
* non-blocking versions
int ret;
while (con->out_kvec_bytes > 0) {
- ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes, 0);
+ ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes);
if (ret < 0) return ret; /* error */
if (ret == 0) return 0; /* socket full */
con->out_kvec_bytes -= ret;
kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos;
kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(msg->hdr.data_len - con->out_msg_pos.data_pos));
- ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len, 0);
+ ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len);
if (ret < 0) return ret;
if (ret == 0) return 0; /* socket full */
con->out_msg_pos.data_pos += ret;
if (ret == 0)
goto done;
- if (con->state == REJECTING) {
+ if (test_bit(REJECTING, &con->state)) {
/* FIXME do something else here, pbly? */
remove_connection(msgr, con);
- con->state = CLOSED;
+ set_bit(CLOSED, &con->state);
put_connection(con);
}
/* header */
while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
left = sizeof(struct ceph_msg_header) - con->in_base_pos;
- ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0);
+ ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
if (con->in_base_pos == sizeof(struct ceph_msg_header)) {
return -ENOMEM;
}
left = m->hdr.front_len - m->front.iov_len;
- ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left, 0);
+ ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left);
if (ret <= 0) return ret;
m->front.iov_len += ret;
}
left = min((int)(m->hdr.data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
p = kmap(m->pages[con->in_msg_pos.page]);
- ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left, 0);
+ ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
if (ret <= 0) return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
{
while (con->in_base_pos < sizeof(con->in_partial_ack)) {
int left = sizeof(con->in_partial_ack) - con->in_base_pos;
- int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left, 0);
+ int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
/* peer addr */
while (con->in_base_pos < sizeof(con->peer_addr)) {
int left = sizeof(con->peer_addr) - con->in_base_pos;
- ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left, 0);
+ ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) {
int off = con->in_base_pos - sizeof(con->peer_addr);
int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos;
- ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left,0);
+ ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
existing = get_connection(con->msgr, &con->peer_addr);
if (existing) {
spin_lock(&existing->con_lock);
- if ((existing->state == CONNECTING && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
- (existing->state == OPEN && con->connect_seq == existing->connect_seq)) {
+ if ((test_bit(CONNECTING, &existing->state) &&
+ compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
+ (test_bit(OPEN, &existing->state) &&
+ con->connect_seq == existing->connect_seq)) {
/* replace existing with new connection */
replace_connection(con->msgr, existing, con);
/* steal message queue */
list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */
con->out_seq = existing->out_seq;
- con->state = OPEN;
- existing->state = CLOSED;
+ set_bit(OPEN, &con->state);
+ set_bit(CLOSED, &existing->state);
} else {
/* reject new connection */
- con->state = REJECTING;
+ set_bit(REJECTING, &con->state);
con->connect_seq = existing->connect_seq; /* send this with the reject */
}
spin_unlock(&existing->con_lock);
put_connection(existing);
} else {
add_connection(con->msgr, con);
- con->state = OPEN;
+ set_bit(OPEN, &con->state);
}
spin_unlock(&con->msgr->con_lock);
/* the result? */
- if (con->state == REJECTING)
+ if (test_bit(REJECTING, &con->state))
prepare_write_accept_reject(con);
else
prepare_write_accept_ready(con);
/*
* TBD: maybe store error in ceph_connection
*/
- /* if (con->state == CLOSED) return -1; */
if (test_bit(CLOSED, &con->state)) goto done;
if (test_bit(ACCEPTING, &con->state)) {
ret = read_accept_partial(con);
- /* TBD: do something with error */
- /* if (ret <= 0) return ret; */
if (ret <= 0) goto done;
/* accepted */
process_accept(con);
}
if (con->in_tag == CEPH_MSGR_TAG_READY) {
- ret = _krecvmsg(con->sock, &con->in_tag, 1, 0);
- /* if (ret <= 0) return ret; */
- /* TBD: do something with error */
+ ret = _krecvmsg(con->sock, &con->in_tag, 1);
if (ret <= 0) goto done;
if (con->in_tag == CEPH_MSGR_TAG_MSG)
prepare_read_message(con);
}
if (con->in_tag == CEPH_MSGR_TAG_MSG) {
ret = read_message_partial(con);
- /* if (ret <= 0) return ret; */
- /* TBD: do something with error */
if (ret <= 0) goto done;
/* got a full message! */
msgr->dispatch(con->msgr->parent, con->in_msg);
}
if (con->in_tag == CEPH_MSGR_TAG_ACK) {
ret = read_ack_partial(con);
- /* if (ret <= 0) return ret; */
- /* TBD: do something with error */
if (ret <= 0) goto done;
/* got an ack */
process_ack(con, con->in_partial_ack);
bad:
BUG_ON(1); /* shouldn't get here */
done:
+ con->error = ret;
return;
}
struct ceph_messenger *ceph_messenger_create()
{
struct ceph_messenger *msgr;
- struct sockaddr_in saddr;
+ int ret = 0;
msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
if (msgr == NULL)
spin_lock_init(&msgr->con_lock);
/* create listening socket */
- msgr->listen_sock = _klisten(&saddr);
- if (IS_ERR(msgr->listen_sock)) {
- int err = PTR_ERR(msgr->listen_sock);
+ ret = _klisten(msgr);
+ if(ret < 0) {
kfree(msgr);
- return ERR_PTR(err);
+ return ERR_PTR(ret);
}
- /* determine my ip:port */
- msgr->inst.addr.ipaddr.sin_family = saddr.sin_family;
- msgr->inst.addr.ipaddr.sin_port = saddr.sin_port;
- msgr->inst.addr.ipaddr.sin_addr = saddr.sin_addr;
-
/* TBD: setup callback for accept */
INIT_WORK(&msgr->awork, try_accept); /* setup work structure */
int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
{
struct ceph_connection *con;
+ int ret = 0;
/* set source */
msg->hdr.src = msgr->inst;
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohl(msg->hdr.dst.addr.ipaddr.sin_port));
con->peer_addr = msg->hdr.dst.addr;
- con->state = CONNECTING;
add_connection(msgr, con);
} else {
dout(5, "had connection to peer %x:%d\n",
spin_lock(&con->con_lock);
/* initiate connect? */
- if (con->state == CONNECTING)
- do_connect(con);
+ if (test_bit(NEW, &con->state)) {
+ ret = _kconnect(con);
+ if (ret < 0){
+ derr(1, "connection failure to peer %x:%d\n",
+ ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
+ ntohl(msg->hdr.dst.addr.ipaddr.sin_port));
+ remove_connection(msgr, con);
+ kfree(con);
+ return(ret);
+
+ }
+ }
/* queue */
dout(1, "queuing outgoing message for %s.%d\n",
spin_unlock(&con->con_lock);
put_connection(con);
- return 0;
+ return ret;
}
/* current state of connection */
-enum ceph_connection_state {
- NEW = 1,
- ACCEPTING = 2,
- CONNECTING = 4,
- OPEN = 8,
- REJECTING = 16,
- CLOSED = 32,
- READ_PEND = 64,
- WRITE_PEND = 128
-};
+#define NEW 1
+#define CONNECTING 2
+#define ACCEPTING 3
+#define OPEN 4
+#define WRITE_PEND 5
+#define REJECTING 6
+#define CLOSED 7
struct ceph_connection {
struct ceph_messenger *msgr;
struct socket *sock; /* connection socket */
+ __u32 state; /* connection state */
atomic_t nref;
spinlock_t con_lock; /* connection lock */
struct list_head list_bucket; /* msgr->con_open or con_accepting */
struct ceph_entity_addr peer_addr; /* peer address */
- enum ceph_connection_state state;
__u32 connect_seq;
__u32 out_seq; /* last message queued for send */
__u32 in_seq, in_seq_acked; /* last message received, acked */