From e55adeb15d49b249abacfb6a3222da12cbe773af Mon Sep 17 00:00:00 2001 From: patiencew Date: Wed, 14 Nov 2007 17:07:56 +0000 Subject: [PATCH] added connecting to peer and removed junk git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2065 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/messenger.c | 90 ++++++++++++++++------------------- trunk/ceph/kernel/messenger.h | 22 ++++----- 2 files changed, 50 insertions(+), 62 deletions(-) diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 7fa169357f0dc..dbbcbdf7b3404 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -167,6 +167,35 @@ static void replace_connection(struct ceph_messenger *msgr, struct ceph_connecti put_connection(old); /* dec reference count */ } +/* + * Sets up connection to peer address, if already exist return that connection + */ + +struct ceph_connection *ceph_connect_to_peer(struct ceph_messenger *msgr, + struct ceph_entity_addr *peer_addr) +{ + struct sockaddr *paddr = (struct sockaddr *)&peer_addr->ipaddr; + struct ceph_connection *con; + + /* check for open connection already existing and use that */ + if (!(con = get_connection(msgr, peer_addr))) { + + con = new_connection(msgr); + + con->sock = _kconnect(paddr); + if (con->sock == NULL) { + kfree(con); + con = NULL; + { + con->peer_addr.erank = peer_addr->erank; + con->peer_addr.nonce = peer_addr->nonce; + con->peer_addr.ipaddr = peer_addr->ipaddr; + /* setup callbacks */ + } + /* TBD: add connection to connection list */ + return con; +} + /* @@ -499,9 +528,6 @@ static void try_read(struct work_struct *work) more: /* * TBD: maybe store error in ceph_connection - * since this is run in a workqueue, we probably need to notify - * whoever added the connection to poll list of completion for - * dispatching. Or error */ /* if (con->state == CLOSED) return -1; */ @@ -619,64 +645,30 @@ done: return; } -struct ceph_connection *new_listener(struct ceph_messenger *msgr) -{ - struct ceph_connection *con; - struct sockaddr saddr; - memset(&saddr, 0, sizeof(saddr)); - - con = kmalloc(sizeof(*con), GFP_KERNEL); - if (con == NULL) - return NULL; - memset(con, 0, sizeof(*con)); - - /* create listener connection */ - spin_lock_init(&con->con_lock); - INIT_WORK(&con->awork, try_accept); /* setup work structure */ - con->msgr = msgr; - atomic_inc(&con->nref); - - /* TBD: if address specified by mount */ - /* make my address from user specified address, fill in saddr */ - - con->sock = _klisten(&saddr); - return(con); -} - /* - * create a new messenger + * create a new messenger instance, saddr is address specified from mount arg. + * If null, will get created by _klisten() */ -static struct ceph_messenger *new_messenger(void) +struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr) { struct ceph_messenger *msgr; - struct ceph_connection *listener; msgr = kmalloc(sizeof(*msgr), GFP_KERNEL); if (msgr == NULL) - goto done; + return NULL; memset(msgr, 0, sizeof(*msgr)); spin_lock_init(&msgr->con_lock); - INIT_LIST_HEAD(&msgr->poll_list); - - /* create listener connection */ - listener = new_listener(msgr); - if (listener == NULL) - goto err; - - list_add(&msgr->poll_list, &listener->poll_list); - - /* start up poll thread */ - msgr->poll_task = kthread_run(start_poll, msgr, "ceph-poll"); - if (IS_ERR(msgr->poll_task)) - goto err; -done: + /* create listening socket */ + msgr->listen_sock = _klisten(&saddr); + if (msgr->listen_sock == NULL) { + kfree(msgr); + return NULL; + } + /* TBD: setup callback for accept */ + INIT_WORK(&msgr->awork, try_accept); /* setup work structure */ return msgr; -err: - kfree(msgr); - msgr = NULL; - goto done; } diff --git a/trunk/ceph/kernel/messenger.h b/trunk/ceph/kernel/messenger.h index a25e83d7bfc7b..8b8a46517c992 100644 --- a/trunk/ceph/kernel/messenger.h +++ b/trunk/ceph/kernel/messenger.h @@ -7,23 +7,20 @@ #include #include #include "bufferlist.h" -#include "poll.h" struct ceph_message; typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_message *m); - struct ceph_messenger { void *parent; ceph_messenger_dispatch_t dispatch; - struct task_struct *poll_task; - struct ceph_connection *listen_con; /* listening connection */ + struct socket *listen_sock; /* listening socket */ + struct work_struct awork; /* accept work */ struct ceph_entity_addr addr; /* my address */ spinlock_t con_lock; struct list_head con_all; /* all connections */ struct list_head con_accepting; /* doing handshake, or */ - struct list_head poll_list; /* connections polling */ struct radix_tree_root con_open; /* established. see get_connection() */ }; @@ -39,12 +36,13 @@ struct ceph_message { /* current state of connection */ enum ceph_connection_state { NEW = 1, - LISTENING = 2, - ACCEPTING = 4, - CONNECTING = 8, - OPEN = 16, - REJECTING = 32, - CLOSED = 64 + ACCEPTING = 2, + CONNECTING = 4, + OPEN = 8, + REJECTING = 16, + CLOSED = 32, + READ_PEND = 64, + WRITE_PEND = 128 }; struct ceph_connection { @@ -56,7 +54,6 @@ struct ceph_connection { struct list_head list_all; /* msgr->con_all */ struct list_head list_bucket; /* msgr->con_open or con_accepting */ - struct list_head poll_list; /* msgr->poll_list */ struct ceph_entity_addr peer_addr; /* peer address */ enum ceph_connection_state state; @@ -77,7 +74,6 @@ struct ceph_connection { struct ceph_message *in_partial; struct ceph_bufferlist_iterator in_pos; /* for msg payload */ - struct work_struct awork; /* accept work */ struct work_struct rwork; /* received work */ struct work_struct swork; /* send work */ int retries; -- 2.39.5