#include "messenger.h"
#include "ktcp.h"
-static struct workqueue_struct *recv_wq; /* receive work queue ) */
-static struct workqueue_struct *send_wq; /* send work queue */
-
/* static tag bytes */
static char tag_ready = CEPH_MSGR_TAG_READY;
static char tag_reject = CEPH_MSGR_TAG_REJECT;
static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_close = CEPH_MSGR_TAG_CLOSE;
+static void try_read(struct work_struct *);
+static void try_write(struct work_struct *);
+static void try_accept(struct work_struct *);
/*
* connections
spin_lock_init(&con->con_lock);
INIT_WORK(&con->rwork, try_read); /* setup work structure */
- INIT_WORK(&con->rwork, try_write); /* setup work structure */
+ INIT_WORK(&con->swork, try_write); /* setup work structure */
atomic_inc(&con->nref);
return con;
more:
len = bl->b_kv[p->i_kv].iov_len - p->i_off;
/* FIXME */
- ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len);
+ /* ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len); */
if (ret < 0) return ret;
if (ret == 0) return 0; /* socket full */
if (ret + p->i_off == bl->b_kv[p->i_kv].iov_len) {
/*
* call when socket is writeable
*/
-static int try_write(struct ceph_messenger *msgr, struct work_struct *work)
+static void try_write(struct work_struct *work)
{
int ret;
struct ceph_connection *con;
+ struct ceph_messenger *msgr;
con = container_of(work, struct ceph_connection, swork);
+ msgr = con->msgr;
more:
/* data queued? */
if (con->out_partial.b_kvlen) {
ret = write_partial(con);
- if (ret == 0) return 0;
+ if (ret == 0) goto done;
/* error or success */
/* clean up */
put_connection(con);
}
- if (ret < 0) return ret; /* error */
+ /* TBD: handle error; return for now */
+ if (ret < 0) goto done; /* error */
}
/* anything else pending? */
}
/* hmm, nothing to do! */
- return 0;
+done:
+ return;
}
static int read_message_partial(struct ceph_connection *con)
{
struct ceph_message *m = con->in_partial;
- int left, ret, s, chunkbytes, c, did;
+ int ret, s, chunkbytes, c, did;
+ size_t left;
while (con->in_base_pos < sizeof(struct ceph_message_header)) {
left = sizeof(struct ceph_message_header) - con->in_base_pos;
- ret = _read(con->sock, &m->hdr + con->in_base_pos, left);
+ ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) {
int off = con->in_base_pos - sizeof(struct ceph_message_header);
left = chunkbytes + sizeof(struct ceph_message_header) - con->in_base_pos;
- ret = _read(con->sock, (char*)m->chunklens + off, left);
+ ret = _krecvmsg(con->sock, (char*)m->chunklens + off, left, 0);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
}
ceph_bl_prepare_append(&m->payload, left);
s = min(m->payload.b_append.iov_len, left);
- ret = _read(con->sock, m->payload.b_append.iov_base, s);
+ ret = _krecvmsg(con->sock, m->payload.b_append.iov_base, s, 0);
if (ret <= 0) return ret;
ceph_bl_append_copied(&m->payload, s);
goto more;
{
while (con->in_base_pos < sizeof(con->in_partial_ack)) {
int left = sizeof(con->in_partial_ack) - con->in_base_pos;
- int ret = _read(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left);
+ int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left, 0);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
static int read_accept_partial(struct ceph_connection *con)
{
+ int ret;
+
/* peer addr */
while (con->in_base_pos < sizeof(con->peer_addr)) {
int left = sizeof(con->peer_addr) - con->in_base_pos;
- int ret = _read(con->sock, (char*)&con->peer_addr + con->in_base_pos, left);
+ ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left, 0);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) {
int off = con->in_base_pos - sizeof(con->peer_addr);
int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos;
- ret = _read(con->sock, (char*)&con->connect_seq + off, left);
+ ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left,0);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
/*
* prepare to read a message
*/
-static int prepare_read_message(struct ceph_connection *con)
+static void prepare_read_message(struct ceph_connection *con)
{
con->in_tag = CEPH_MSGR_TAG_MSG;
con->in_base_pos = 0;
con->in_partial = kmalloc(sizeof(struct ceph_message), GFP_KERNEL);
- if (!con->in_partial) return -1; /* crap */
+ if (!con->in_partial) {
+ /* TBD: we don't check for error in caller, handle error */
+ printk(KERN_INFO "malloc failure\n");
+ goto done;
+ }
+
ceph_get_msg(con->in_partial);
ceph_bl_init(&con->in_partial->payload);
ceph_bl_iterator_init(&con->in_pos);
+done:
+ return;
}
/*
/*
* call when data is available on the socket
*/
-static int try_read(struct work_struct *work)
+static void try_read(struct work_struct *work)
{
int ret = -1;
struct ceph_connection *con;
+ struct ceph_messenger *msgr;
con = container_of(work, struct ceph_connection, rwork);
+ msgr = con->msgr;
more:
- if (con->state == CLOSED) return -1;
- if (con->state == ACCEPTING) {
+ /*
+ * TBD: maybe store error in ceph_connection
+ * since this is run in a workqueue, we probably need to notify
+ * whoever added the connection to poll list of completion for
+ * dispatching. Or error
+ */
+ /* if (con->state == CLOSED) return -1; */
+
+ if (test_bit(CLOSED, &con->state)) goto done;
+ if (test_bit(ACCEPTING, &con->state)) {
ret = read_accept_partial(con);
- if (ret <= 0) return ret;
+ /* TBD: do something with error */
+ /* if (ret <= 0) return ret; */
+ if (ret <= 0) goto done;
/* accepted */
process_accept(con);
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_READY) {
- ret = _read(con->sock, &con->in_tag, 1);
- if (ret <= 0) return ret;
+ ret = _krecvmsg(con->sock, &con->in_tag, 1, 0);
+ /* if (ret <= 0) return ret; */
+ /* TBD: do something with error */
+ if (ret <= 0) goto done;
if (con->in_tag == CEPH_MSGR_TAG_MSG)
prepare_read_message(con);
else if (con->in_tag == CEPH_MSGR_TAG_ACK)
}
if (con->in_tag == CEPH_MSGR_TAG_MSG) {
ret = read_message_partial(con);
- if (ret <= 0) return ret;
+ /* if (ret <= 0) return ret; */
+ /* TBD: do something with error */
+ if (ret <= 0) goto done;
/* got a full message! */
msgr->dispatch(con->msgr->parent, con->in_partial);
ceph_put_msg(con->in_partial);
}
if (con->in_tag == CEPH_MSGR_TAG_ACK) {
ret = read_ack_partial(con);
- if (ret <= 0) return ret;
+ /* if (ret <= 0) return ret; */
+ /* TBD: do something with error */
+ if (ret <= 0) goto done;
/* got an ack */
process_ack(con, con->in_partial_ack);
con->in_tag = CEPH_MSGR_TAG_READY;
}
bad:
BUG_ON(1); /* shouldn't get here */
- return ret;
+done:
+ return;
}
/*
* Accepter thread
*/
-static int try_accept(struct work_struct *work)
+static void try_accept(struct work_struct *work)
{
struct socket *sd, *new_sd;
struct sockaddr saddr;
struct ceph_connection *con;
struct ceph_connection *new_con = NULL;
+ struct ceph_messenger *msgr;
+ int len;
con = container_of(work, struct ceph_connection, awork);
+ msgr = con->msgr;
+ sd = con->sock;
printk(KERN_INFO "Entered try_accept\n");
/* get the address at the other end */
memset(&saddr, 0, sizeof(saddr));
- if (new_sd->ops->getname(new_sd, saddr, &len, 2)) {
+ if (new_sd->ops->getname(new_sd, &saddr, &len, 2)) {
printk(KERN_INFO "getname error connection aborted\n");
sock_release(new_sd);
goto done;
goto done;
}
new_con->sock = new_sd;
- setbit(ACCEPTING, &con->state);
+ set_bit(ACCEPTING, &con->state);
new_con->in_tag = CEPH_MSGR_TAG_READY;
- new_con->peeraddr = saddr;
+ /* TBD: fill in part of peers address */
+ /* new_con->peeraddr = saddr; */
/* TBD: may not use this.. */
new_sd->sk->sk_user_data = con;
/* hand off to worker threads , send pending */
/*?? queue_work(send_wq, &new_con->swork);*/
done:
- return(0);
-}
-
-
-int ceph_work_init(void)
-{
- int ret = 0;
-
- /*
- * Create a num CPU threads to handle receive requests
- * note: we can create more threads if needed to even out
- * the scheduling of multiple requests..
- */
- recv_wq = create_workqueue("ceph recv");
- ret = IS_ERR(recv_wq);
- if (ret) {
- printk(KERN_INFO "receive worker failed to start: %d\n", ret);
- destroy_workqueue(recv_wq);
- return ret;
- }
-
- /*
- * Create a single thread to handle send requests
- * note: may use same thread pool as receive workers later...
- */
- send_wq = create_singlethread_workqueue("ceph send");
- ret = IS_ERR(send_wq);
- if (ret) {
- printk(KERN_INFO "send worker failed to start: %d\n", ret);
- destroy_workqueue(send_wq);
- return ret;
- }
-
- return(ret);
-}
-
-void ceph_work_shutdown(void)
-{
- destroy_workqueue(send_wq);
- destroy_workqueue(recv_wq);
+ return;
}
struct ceph_connection *new_listener(struct ceph_messenger *msgr)
/*
* create a new messenger
*/
-static struct ceph_messenger *new_messenger()
+static struct ceph_messenger *new_messenger(void)
{
struct ceph_messenger *msgr;
struct ceph_connection *con;
- struct ceph_pollable *pfile;
+ struct ceph_pollable *pfiles;
msgr = kmalloc(sizeof(struct ceph_messenger), GFP_KERNEL);
if (msgr == NULL) return 0;
* TBD: maybe do this before start polling */
pfiles->con = con;
pfiles->file = con->sock->file;
- list_add(&pfiles->poll_list); /* add to poll list */
+ list_add(&pfiles->poll_list, &msgr->poll_task->pfiles->poll_list); /* add to poll list */
return msgr;
}