From 2426a002638597a06933480097a3b731ce92c2cd Mon Sep 17 00:00:00 2001 From: patiencew Date: Sat, 10 Nov 2007 22:39:56 +0000 Subject: [PATCH] builds somewhat.. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2045 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/messenger.c | 147 +++++++++++++++++----------------- 1 file changed, 73 insertions(+), 74 deletions(-) diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 684ea439d47ee..62e22210c6ae9 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -9,9 +9,6 @@ #include "messenger.h" #include "ktcp.h" -static struct workqueue_struct *recv_wq; /* receive work queue ) */ -static struct workqueue_struct *send_wq; /* send work queue */ - /* static tag bytes */ static char tag_ready = CEPH_MSGR_TAG_READY; static char tag_reject = CEPH_MSGR_TAG_REJECT; @@ -19,6 +16,9 @@ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_close = CEPH_MSGR_TAG_CLOSE; +static void try_read(struct work_struct *); +static void try_write(struct work_struct *); +static void try_accept(struct work_struct *); /* * connections @@ -38,7 +38,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) spin_lock_init(&con->con_lock); INIT_WORK(&con->rwork, try_read); /* setup work structure */ - INIT_WORK(&con->rwork, try_write); /* setup work structure */ + INIT_WORK(&con->swork, try_write); /* setup work structure */ atomic_inc(&con->nref); return con; @@ -190,7 +190,7 @@ static int write_partial(struct ceph_connection *con) more: len = bl->b_kv[p->i_kv].iov_len - p->i_off; /* FIXME */ - ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len); + /* ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len); */ if (ret < 0) return ret; if (ret == 0) return 0; /* socket full */ if (ret + p->i_off == bl->b_kv[p->i_kv].iov_len) { @@ -271,18 +271,20 @@ static void prepare_write_accept_reject(struct ceph_connection *con) /* * call when socket is writeable */ -static int try_write(struct ceph_messenger *msgr, struct work_struct *work) +static void try_write(struct work_struct *work) { int ret; struct ceph_connection *con; + struct ceph_messenger *msgr; con = container_of(work, struct ceph_connection, swork); + msgr = con->msgr; more: /* data queued? */ if (con->out_partial.b_kvlen) { ret = write_partial(con); - if (ret == 0) return 0; + if (ret == 0) goto done; /* error or success */ /* clean up */ @@ -296,7 +298,8 @@ more: put_connection(con); } - if (ret < 0) return ret; /* error */ + /* TBD: handle error; return for now */ + if (ret < 0) goto done; /* error */ } /* anything else pending? */ @@ -310,7 +313,8 @@ more: } /* hmm, nothing to do! */ - return 0; +done: + return; } @@ -320,11 +324,12 @@ more: static int read_message_partial(struct ceph_connection *con) { struct ceph_message *m = con->in_partial; - int left, ret, s, chunkbytes, c, did; + int ret, s, chunkbytes, c, did; + size_t left; while (con->in_base_pos < sizeof(struct ceph_message_header)) { left = sizeof(struct ceph_message_header) - con->in_base_pos; - ret = _read(con->sock, &m->hdr + con->in_base_pos, left); + ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -334,7 +339,7 @@ static int read_message_partial(struct ceph_connection *con) while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) { int off = con->in_base_pos - sizeof(struct ceph_message_header); left = chunkbytes + sizeof(struct ceph_message_header) - con->in_base_pos; - ret = _read(con->sock, (char*)m->chunklens + off, left); + ret = _krecvmsg(con->sock, (char*)m->chunklens + off, left, 0); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -349,7 +354,7 @@ static int read_message_partial(struct ceph_connection *con) } ceph_bl_prepare_append(&m->payload, left); s = min(m->payload.b_append.iov_len, left); - ret = _read(con->sock, m->payload.b_append.iov_base, s); + ret = _krecvmsg(con->sock, m->payload.b_append.iov_base, s, 0); if (ret <= 0) return ret; ceph_bl_append_copied(&m->payload, s); goto more; @@ -364,7 +369,7 @@ static int read_ack_partial(struct ceph_connection *con) { while (con->in_base_pos < sizeof(con->in_partial_ack)) { int left = sizeof(con->in_partial_ack) - con->in_base_pos; - int ret = _read(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left); + int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left, 0); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -374,10 +379,12 @@ static int read_ack_partial(struct ceph_connection *con) static int read_accept_partial(struct ceph_connection *con) { + int ret; + /* peer addr */ while (con->in_base_pos < sizeof(con->peer_addr)) { int left = sizeof(con->peer_addr) - con->in_base_pos; - int ret = _read(con->sock, (char*)&con->peer_addr + con->in_base_pos, left); + ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left, 0); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -386,7 +393,7 @@ static int read_accept_partial(struct ceph_connection *con) while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) { int off = con->in_base_pos - sizeof(con->peer_addr); int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos; - ret = _read(con->sock, (char*)&con->connect_seq + off, left); + ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left,0); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -396,15 +403,22 @@ static int read_accept_partial(struct ceph_connection *con) /* * prepare to read a message */ -static int prepare_read_message(struct ceph_connection *con) +static void prepare_read_message(struct ceph_connection *con) { con->in_tag = CEPH_MSGR_TAG_MSG; con->in_base_pos = 0; con->in_partial = kmalloc(sizeof(struct ceph_message), GFP_KERNEL); - if (!con->in_partial) return -1; /* crap */ + if (!con->in_partial) { + /* TBD: we don't check for error in caller, handle error */ + printk(KERN_INFO "malloc failure\n"); + goto done; + } + ceph_get_msg(con->in_partial); ceph_bl_init(&con->in_partial->payload); ceph_bl_iterator_init(&con->in_pos); +done: + return; } /* @@ -473,26 +487,40 @@ static void process_accept(struct ceph_connection *con) /* * call when data is available on the socket */ -static int try_read(struct work_struct *work) +static void try_read(struct work_struct *work) { int ret = -1; struct ceph_connection *con; + struct ceph_messenger *msgr; con = container_of(work, struct ceph_connection, rwork); + msgr = con->msgr; more: - if (con->state == CLOSED) return -1; - if (con->state == ACCEPTING) { + /* + * TBD: maybe store error in ceph_connection + * since this is run in a workqueue, we probably need to notify + * whoever added the connection to poll list of completion for + * dispatching. Or error + */ + /* if (con->state == CLOSED) return -1; */ + + if (test_bit(CLOSED, &con->state)) goto done; + if (test_bit(ACCEPTING, &con->state)) { ret = read_accept_partial(con); - if (ret <= 0) return ret; + /* TBD: do something with error */ + /* if (ret <= 0) return ret; */ + if (ret <= 0) goto done; /* accepted */ process_accept(con); goto more; } if (con->in_tag == CEPH_MSGR_TAG_READY) { - ret = _read(con->sock, &con->in_tag, 1); - if (ret <= 0) return ret; + ret = _krecvmsg(con->sock, &con->in_tag, 1, 0); + /* if (ret <= 0) return ret; */ + /* TBD: do something with error */ + if (ret <= 0) goto done; if (con->in_tag == CEPH_MSGR_TAG_MSG) prepare_read_message(con); else if (con->in_tag == CEPH_MSGR_TAG_ACK) @@ -505,7 +533,9 @@ more: } if (con->in_tag == CEPH_MSGR_TAG_MSG) { ret = read_message_partial(con); - if (ret <= 0) return ret; + /* if (ret <= 0) return ret; */ + /* TBD: do something with error */ + if (ret <= 0) goto done; /* got a full message! */ msgr->dispatch(con->msgr->parent, con->in_partial); ceph_put_msg(con->in_partial); @@ -515,7 +545,9 @@ more: } if (con->in_tag == CEPH_MSGR_TAG_ACK) { ret = read_ack_partial(con); - if (ret <= 0) return ret; + /* if (ret <= 0) return ret; */ + /* TBD: do something with error */ + if (ret <= 0) goto done; /* got an ack */ process_ack(con, con->in_partial_ack); con->in_tag = CEPH_MSGR_TAG_READY; @@ -523,21 +555,26 @@ more: } bad: BUG_ON(1); /* shouldn't get here */ - return ret; +done: + return; } /* * Accepter thread */ -static int try_accept(struct work_struct *work) +static void try_accept(struct work_struct *work) { struct socket *sd, *new_sd; struct sockaddr saddr; struct ceph_connection *con; struct ceph_connection *new_con = NULL; + struct ceph_messenger *msgr; + int len; con = container_of(work, struct ceph_connection, awork); + msgr = con->msgr; + sd = con->sock; printk(KERN_INFO "Entered try_accept\n"); @@ -551,7 +588,7 @@ static int try_accept(struct work_struct *work) /* get the address at the other end */ memset(&saddr, 0, sizeof(saddr)); - if (new_sd->ops->getname(new_sd, saddr, &len, 2)) { + if (new_sd->ops->getname(new_sd, &saddr, &len, 2)) { printk(KERN_INFO "getname error connection aborted\n"); sock_release(new_sd); goto done; @@ -565,9 +602,10 @@ static int try_accept(struct work_struct *work) goto done; } new_con->sock = new_sd; - setbit(ACCEPTING, &con->state); + set_bit(ACCEPTING, &con->state); new_con->in_tag = CEPH_MSGR_TAG_READY; - new_con->peeraddr = saddr; + /* TBD: fill in part of peers address */ + /* new_con->peeraddr = saddr; */ /* TBD: may not use this.. */ new_sd->sk->sk_user_data = con; @@ -580,46 +618,7 @@ static int try_accept(struct work_struct *work) /* hand off to worker threads , send pending */ /*?? queue_work(send_wq, &new_con->swork);*/ done: - return(0); -} - - -int ceph_work_init(void) -{ - int ret = 0; - - /* - * Create a num CPU threads to handle receive requests - * note: we can create more threads if needed to even out - * the scheduling of multiple requests.. - */ - recv_wq = create_workqueue("ceph recv"); - ret = IS_ERR(recv_wq); - if (ret) { - printk(KERN_INFO "receive worker failed to start: %d\n", ret); - destroy_workqueue(recv_wq); - return ret; - } - - /* - * Create a single thread to handle send requests - * note: may use same thread pool as receive workers later... - */ - send_wq = create_singlethread_workqueue("ceph send"); - ret = IS_ERR(send_wq); - if (ret) { - printk(KERN_INFO "send worker failed to start: %d\n", ret); - destroy_workqueue(send_wq); - return ret; - } - - return(ret); -} - -void ceph_work_shutdown(void) -{ - destroy_workqueue(send_wq); - destroy_workqueue(recv_wq); + return; } struct ceph_connection *new_listener(struct ceph_messenger *msgr) @@ -648,11 +647,11 @@ struct ceph_connection *new_listener(struct ceph_messenger *msgr) /* * create a new messenger */ -static struct ceph_messenger *new_messenger() +static struct ceph_messenger *new_messenger(void) { struct ceph_messenger *msgr; struct ceph_connection *con; - struct ceph_pollable *pfile; + struct ceph_pollable *pfiles; msgr = kmalloc(sizeof(struct ceph_messenger), GFP_KERNEL); if (msgr == NULL) return 0; @@ -677,7 +676,7 @@ static struct ceph_messenger *new_messenger() * TBD: maybe do this before start polling */ pfiles->con = con; pfiles->file = con->sock->file; - list_add(&pfiles->poll_list); /* add to poll list */ + list_add(&pfiles->poll_list, &msgr->poll_task->pfiles->poll_list); /* add to poll list */ return msgr; } -- 2.39.5