]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
accepter bits; no connect, yet
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Nov 2007 22:51:30 +0000 (22:51 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Nov 2007 22:51:30 +0000 (22:51 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2017 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/include/ceph_fs.h
trunk/ceph/kernel/kmsg.h
trunk/ceph/kernel/messenger.c

index b9d2d09e055ad32b146aea3fe84431e5fd3900b9..c01f11e3bcfa44f630c9aa5c7572f8ca214d979d 100644 (file)
@@ -142,11 +142,11 @@ struct ceph_entity_name {
 #define CEPH_ENTITY_TYPE_CLIENT 4
 #define CEPH_ENTITY_TYPE_ADMIN  5
 
-#define CEPH_MSGR_TAG_READY   0  // server -> client + oseq: ready for messages
-#define CEPH_MSGR_TAG_REJECT  1  // server -> client + oseq: decline socket
-#define CEPH_MSGR_TAG_MSG     2  // message
-#define CEPH_MSGR_TAG_ACK     3  // message ack
-#define CEPH_MSGR_TAG_CLOSE   4  // closing pipe
+#define CEPH_MSGR_TAG_READY   1  // server -> client + cseq: ready for messages
+#define CEPH_MSGR_TAG_REJECT  2  // server -> client + cseq: decline socket
+#define CEPH_MSGR_TAG_MSG     3  // message
+#define CEPH_MSGR_TAG_ACK     4  // message ack
+#define CEPH_MSGR_TAG_CLOSE   5  // closing pipe
 
 
 /*
index cff51cdba7eaed0fce1198d299f9da41dee37c91..b1b0004cd71540f6a22c0642a219c2d5dcfeeea6 100644 (file)
@@ -14,23 +14,27 @@ extern struct task_struct *athread;
 struct ceph_kmsgr {
        void *m_parent;
        struct task_struct *athread;
-       struct radix_tree_root mpipes;          /* other nodes talk to */
+       struct radix_tree_root connections; /* see get_connection() */
+       struct list_head accepting;         /* connections that aren't open yet */
 };
 
 struct ceph_message {
-       atomic_t nref;
-       int mflags;
-       struct ceph_message_header *msghdr;     /* header */
-       __u32 chunklens[2];
+       struct ceph_message_header hdr; /* header */
+       __u32 chunklen[2];
        struct ceph_bufferlist payload;
-       struct list_head m_list_head;
+
+       struct list_head list_head;
+       atomic_t nref;
 };
 
 /* current state of connection, probably won't need all these.. */
 enum ceph_con_state {
+       NEW,
        ACCEPTING,
        CONNECTING,
        OPEN,
+       REJECTING,
+       CLOSED,
 
        READ_PENDING,
        READING,
@@ -52,31 +56,31 @@ enum ceph_con_state {
 
 struct ceph_connection {
        struct socket *sock;    /* connection socket */
-       /* TDB: may need a mutex here depending if */
-       spinlock_t con_lock;
+       
+       atomic_t nref;
+       spinlock_t con_lock;    /* TDB: may need a mutex here depending if */
 
+       struct ceph_message_addr peer_addr; /* peer address */
+       struct list_head list_head;
        enum ceph_con_state state;
        __u32 connect_seq;     
        __u32 out_seq;               /* last message queued for send */
        __u32 in_seq, in_seq_acked;  /* last message received, acked */
 
-       
        /* out queue */
-/* note: need to adjust queues because we have a work queue for the message */ 
-       spinlock_t out_queue_lock;
+       /* note: need to adjust queues because we have a work queue for the message */ 
        struct list_head out_queue;
-       struct ceph_bufferlist out_partial;
+       struct ceph_bufferlist out_partial;  /* refereces existing bufferlists; do not free() */
        struct ceph_bufferlist_iterator out_pos;
        struct list_head out_sent;   /* sending/sent but unacked; resend if connection drops */
 
        /* partially read message contents */
-       char in_tag;  /* ack or msg */
+       char in_tag;       /* READY (accepting, or no in-progress read) or ACK or MSG */
+       int in_base_pos;   /* for ack seq, or msg headers, or accept handshake */
        __u32 in_partial_ack;  
-       int in_base_pos;   /* for ack seq, or msg header */
        struct ceph_message *in_partial;
        struct ceph_bufferlist_iterator in_pos;  /* for msg payload */
 
-
        struct work_struct rwork;               /* received work */
        struct work_struct swork;               /* send work */
        int retries;
index b6a0ac49756e855c219800dbe376af085cad452c..8edd647b6ccf509ffdc67fec98c9ad6567e04aa4 100644 (file)
@@ -23,6 +23,118 @@ static char tag_ack = CEPH_MSGR_TAG_ACK;
 static char tag_close = CEPH_MSGR_TAG_CLOSE;
 
 
+/*
+ * connections
+ */
+
+/* 
+ * create a new connection.  initial state is NEW.
+ */
+static struct ceph_connection *new_connection()
+{
+       struct ceph_connection *con;
+       con = kmalloc(sizeof(struct ceph_connection));
+       if (con == NULL) return 0;
+       memset(&con, 0, sizeof(con));
+
+       spin_init(&con->con_lock);
+       INIT_WORK(&con->rwork, ceph_reader);    /* setup work structure */
+
+       atomic_inc(&con->nref);
+       return con;
+}
+
+/* 
+ * get an existing connection, if any, for given addr
+ */
+static struct ceph_connection *get_connection(struct ceph_kmsgr *msgr, struct ceph_entity_addr *addr)
+{
+       unsigned long key;
+       struct ceph_connection *con;
+       struct list_head *head, *p;
+
+       /*
+        * the radix_tree has an unsigned long key and void * value.  since
+        * ceph_entity_addr is bigger than that, we use a trivial hash key, and
+        * point to a list_head in ceph_connection, as you would with a hash
+        * table.  in the rare event that the trivial hash collides, we just
+        * traverse the (short) list.
+        */
+       key = *(unsigned long*)&addr->addr.sin_addr.s_addr;
+       key ^= addr->addr.sin_port;
+
+       /* existing? */
+       spin_lock(&msgr->lock);
+       head = radix_lookup(&msgr->connections, key);
+       if (head) {
+               list_for_each(p, head) {
+                       con = list_entry(p, struct ceph_connection, list_head);
+                       if (con->peer_addr == addr) {
+                               atomic_inc(&con->nref);
+                               goto out;
+                       }
+               }
+       }
+       con = NULL;
+out:
+       spin_unlock(&msgr->lock);
+       return con;
+}
+
+/* 
+ * drop a reference
+ */
+static void put_connection(struct ceph_connection *con) 
+{
+       if (atomic_dec_and_test(&con->nref)) {
+               /* FIXME close socket? */
+               kfree(con);
+       }
+}
+
+/* 
+ * add to connections tree
+ */
+static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+{
+       unsigned long key;
+       struct list_head *head, *p;
+
+       key = *(unsigned long*)&addr->addr.sin_addr.s_addr;
+       key ^= addr->addr.sin_port;
+
+       spin_lock(&msgr->lock);
+       head = radix_lookup(&msgr->connections, key);
+       if (head) {
+               list_add(&head, &con->list_head);
+       } else {
+               list_init(&con->list_head); /* empty */
+               radix_insert(&msgr->connections, key, &con->list_head);
+       }
+       spin_unlock(&msgr->lock);
+
+       /* inc ref count */
+       atomic_inc(&con->nref);
+
+       return con;
+}
+
+/*
+ * replace another connection
+ */
+static void replace_connection(struct ceph_kmsgr *msgr, struct ceph_connection *old, struct ceph_connection *new)
+{
+       spin_lock(&msgr->lock);
+       list_add(&new->list_head, &old->list_head);
+       list_remove(&old->list_head);
+       spin_unlock(&msgr->lock);
+       put_connection(old); /* dec reference count */
+}
+
+
+
+
+
 
 /*
  * blocking versions
@@ -111,7 +223,7 @@ static int ceph_send_message(struct ceph_message *message, struct socket *sd)
 /*
  * non-blocking versions
  *
- * these are called while holding a lock on the connection
+ * these should be called while holding con->con_lock
  */
 
 /*
@@ -181,8 +293,29 @@ static void prepare_write_ack(struct ceph_connection *con)
        
        ceph_bl_init(&con->out_partial);  
        ceph_bl_iterator_init(&con->out_pos);
-       ceph_bl_append_copy(&con->out_partial, &tag_ack, 1);
-       ceph_bl_append_copy(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked));
+       ceph_bl_append_ref(&con->out_partial, &tag_ack, 1);
+       ceph_bl_append_ref(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked));
+}
+
+static void prepare_write_accept_announce(struct ceph_connection *con)
+{
+       ceph_bl_init(&con->out_partial);  
+       ceph_bl_iterator_init(&con->out_pos);
+       ceph_bl_append_ref(&con->out_partial, &con->msgr->addr, sizeof(con->msgr->addr));
+}
+
+static void prepare_write_accept_ready(struct ceph_connection *con)
+{
+       ceph_bl_init(&con->out_partial);  
+       ceph_bl_iterator_init(&con->out_pos);
+       ceph_bl_append_ref(&con->out_partial, &tag_ready, 1);
+}
+static void prepare_write_accept_reject(struct ceph_connection *con)
+{
+       ceph_bl_init(&con->out_partial);  
+       ceph_bl_iterator_init(&con->out_pos);
+       ceph_bl_append_ref(&con->out_partial, &tag_reject, 1);
+       ceph_bl_append_ref(&con->out_partial, &con->connect_seq, sizeof(con->connect_seq));
 }
 
 /*
@@ -193,18 +326,26 @@ static int try_write(struct ceph_connection *con)
        int ret;
 
 more:
+       /* data queued? */
        if (con->out_partial.b_kvlen) {
                ret = write_partial(con);
                if (ret == 0) return 0;
 
+               /* error or success */
                /* clean up */
                ceph_bl_init(&con->out_partial);  
                ceph_bl_iterator_init(&con->out_pos);
 
+               if (con->state == REJECTING) {
+                       /* FIXME do something else here, pbly? */
+                       list_remove(&con->list_head);
+                       con->state = CLOSED;  
+               }
+               
                if (ret < 0) return ret; /* error */
        }
        
-       /* what next? */
+       /* anything else pending? */
        if (con->in_seq > con->in_seq_acked) {
                prepare_write_ack(con);
                goto more;
@@ -276,6 +417,28 @@ static int read_ack_partial(struct ceph_connection *con)
        return 1; /* done */
 }
 
+
+static int read_accept_partial(struct ceph_connection *con)
+{
+       /* peer addr */
+       while (con->in_base_pos < sizeof(con->peer_addr)) {
+               int left = sizeof(con->peer_addr) - con->in_base_pos;
+               ret = _read(socket, (char*)&con->peer_addr + con->in_base_pos, left);
+               if (ret <= 0) return ret;
+               con->in_base_pos += ret;
+       }
+
+       /* connect_seq */
+       while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) {
+               int off = in_base_pos - sizeof(con->peer_addr);
+               left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - in_base_pos;
+               ret = _read(socket, (char*)&m->connect_seq + off, left);
+               if (ret <= 0) return ret;
+               con->in_base_pos += ret;
+       }
+       return 1; /* done */
+}
+
 /* 
  * prepare to read a message
  */
@@ -311,11 +474,60 @@ static void process_ack(struct ceph_connection *con, __u32 ack)
        }
 }
 
+static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+{
+       struct ceph_connection *existing;
+       spin_lock(&msgr->lock);
+       existing = get_connection(msgr, &con->peer_addr);
+       if (existing) {
+               spin_lock(&existing->con_lock);
+               if ((existing->state == CONNECTING && compare_addr(&msgr->addr, &con->peer_addr)) ||
+                   (existing->state == OPEN && con->connect_seq == existing->connect_seq)) {
+                       /* replace existing with new connection */
+                       replace_connection(msgr, existing, con);
+                       /* steal message queue */
+                       list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */
+                       con->out_seq = existing->out_seq;
+                       con->state = OPEN;
+                       existing->state = CLOSED;
+               } else {
+                       /* reject new connection */
+                       con->state = REJECTING;
+                       con->connect_seq = existing->connect_seq; /* send this with the reject */
+               }
+               spin_unlock(&existing->con_lock);
+               put_connection(existing);
+       } else {
+               add_connection(con);
+               con->state = OPEN;
+       }       
+       spin_unlock(&msgr->lock);
+
+       /* the result? */
+       if (con->state == REJECTING)
+               prepare_write_accept_reject(con);
+       else
+               prepare_write_accept_ready(con);
+}
+
+
+/*
+ * call when data is available on the socket
+ */
 static int try_read(struct ceph_connection *con)
 {
        int ret = -1;
 
 more:
+       if (con->state == CLOSED) return -1;
+       if (con->state == ACCEPTING) {
+               ret = read_accept_partial(con);
+               if (ret <= 0) return ret;
+               /* accepted */
+               process_accept(con);
+               goto more;
+       }
+
        if (con->in_tag == CEPH_MSGR_TAG_READY) {
                ret = _read(socket, &con->in_tag, 1);
                if (ret <= 0) return ret;
@@ -382,18 +594,21 @@ static int ceph_accepter(void *unusedfornow)
                 printk(KERN_INFO "accepted connection \n");
                 set_current_state(TASK_INTERRUPTIBLE);
                /* initialize the ceph connection */
-               con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+               con = new_connection(NULL);
                if (con == NULL) {
                        printk(KERN_INFO "malloc failure\n");
                        sock_release(new_sd);
                        break;
                }
-               con->sock = new_sd;
-               con->state = READ_PENDING;
-               /* setup work structure */
-               INIT_WORK(&con->rwork, ceph_reader);
-               /* hand off to worker threads , read pending */
-               queue_work(recv_wq, &con->rwork);
+               con->socket = new_sd;
+               con->state = ACCEPTING;
+               con->in_tag = CEPH_MSGR_TAG_READY;
+
+               prepare_write_accept_announce(con);
+               list_add(&msgr->accepting, &con->list_head);
+
+               /* hand off to worker threads , read pending */
+               /*?? queue_work(recv_wq, &con->rwork);*/
         }
         set_current_state(TASK_RUNNING);
         printk(KERN_INFO "kernel thread exiting\n");