]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: factor out connection creation; add con_send, con_keepalive
authorSage Weil <sage@newdream.net>
Tue, 25 Aug 2009 23:49:28 +0000 (16:49 -0700)
committerSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 20:09:09 +0000 (13:09 -0700)
Mmm, much cleaner so far!

src/TODO
src/include/msgr.h
src/kernel/messenger.c
src/kernel/messenger.h

index e360d960be543264d6c7091836faa67a097eedcf..e9f70cc5314544ff0b2f4765773016504ba92ccc 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -26,9 +26,16 @@ v0.14
 - radosgw
 - uclient: fix write vs max_size?
 
-- msgr: unidirectional option
 - mds: put migration vectors in mdsmap
 
+- msgr: unidirectional option
+  - what about mon -> mds/osd messages?
+- kclient msgr fixups
+  - private field in ceph_connection
+  - kill radix_tree; allocate connections in callers (osdc, monc, mdsc)
+  - simplify ceph_ping (should be a single byte)
+
+
 bugs
 - premature filejournal trimming?
 - weird osd_lock contention during osd restart?
index 52d2555743fab85fccf685907c808348fc718ec1..50859cf4f47fdce83d60c4a9f691e8441bf03d81 100644 (file)
@@ -92,6 +92,7 @@ struct ceph_entity_inst {
 #define CEPH_MSGR_TAG_CLOSE         6  /* closing pipe */
 #define CEPH_MSGR_TAG_MSG          10  /* message */
 #define CEPH_MSGR_TAG_ACK          11  /* message ack */
+#define CEPH_MSGR_TAG_KEEPALIVE    12  /* just a keepalive byte! */
 
 
 /*
index 8d618f23ba9410551157d0fe51e0caf6e06491e5..e95ebd664b9111128126b175a0ba25377b61eb55 100644 (file)
@@ -35,6 +35,7 @@
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
+static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 
 
 static void ceph_queue_con(struct ceph_connection *con);
@@ -216,15 +217,11 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
 
 
 /*
- * create a new connection.
+ * initialize a new connection
  */
-static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
+void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
+                  struct ceph_entity_addr *addr)
 {
-       struct ceph_connection *con;
-
-       con = kzalloc(sizeof(*con), GFP_NOFS);
-       if (con == NULL)
-               return NULL;
        con->msgr = msgr;
        atomic_set(&con->nref, 1);
        INIT_LIST_HEAD(&con->list_all);
@@ -233,9 +230,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, con_work);
-
-       dout("new connection: %p\n", con);
-       return con;
+       con->peer_addr = *addr;
 }
 
 /*
@@ -553,6 +548,20 @@ static void prepare_write_ack(struct ceph_connection *con)
        set_bit(WRITE_PENDING, &con->state);
 }
 
+/*
+ * Prepare to write keepalive byte.
+ */
+static void prepare_write_keepalive(struct ceph_connection *con)
+{
+       dout("prepare_write_keepalive %p\n", con);
+       con->out_kvec[0].iov_base = &tag_keepalive;
+       con->out_kvec[0].iov_len = 1;
+       con->out_kvec_left = 1;
+       con->out_kvec_bytes = 1;
+       con->out_kvec_cur = con->out_kvec;
+       set_bit(WRITE_PENDING, &con->state);
+}
+
 /*
  * Connection negotiation.
  */
@@ -1350,6 +1359,11 @@ more_kvec:
                spin_unlock(&con->out_queue_lock);
        }
 
+       if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
+               prepare_write_keepalive(con);
+               goto more_kvec;
+       }
+
        /* Nothing to do! */
        clear_bit(WRITE_PENDING, &con->state);
        dout("try_write nothing else to write.\n");
@@ -1781,9 +1795,10 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                /* drop lock while we allocate a new connection */
                spin_unlock(&msgr->con_lock);
 
-               newcon = new_connection(msgr);
-               if (IS_ERR(newcon))
-                       return PTR_ERR(con);
+               newcon = kzalloc(sizeof(*con), GFP_NOFS);
+               if (!newcon)
+                       return -ENOMEM;
+               ceph_con_init(msgr, newcon, &msg->hdr.dst.addr);
 
                newcon->out_connect.flags = 0;
                if (!timeout) {
@@ -1860,6 +1875,45 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        return ret;
 }
 
+/*
+ * Queue up an outgoing message on the given connection.
+ */
+void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       /* set source */
+       msg->hdr.src = con->msgr->inst;
+       msg->hdr.orig_src = con->msgr->inst;
+
+       /* queue */
+       spin_lock(&con->out_queue_lock);
+       msg->hdr.seq = cpu_to_le64(++con->out_seq);
+       dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg,
+            (unsigned)con->out_seq,
+            ENTITY_NAME(msg->hdr.dst.name), le16_to_cpu(msg->hdr.type),
+            ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
+            le32_to_cpu(msg->hdr.front_len),
+            le32_to_cpu(msg->hdr.middle_len),
+            le32_to_cpu(msg->hdr.data_len));
+       dout("ceph_con_send %p %p seq %llu for %s%d on %p pgs %d\n",
+            con, msg, le64_to_cpu(msg->hdr.seq),
+            ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages);
+       list_add_tail(&msg->list_head, &con->out_queue);
+       spin_unlock(&con->out_queue_lock);
+
+       /* if there wasn't anything waiting to send before, queue
+        * new work */
+       if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+               ceph_queue_con(con);
+}
+
+void ceph_con_keepalive(struct ceph_connection *con)
+{
+       if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
+           test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+               ceph_queue_con(con);
+}
+
+
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
index 36e935d31468127e26129451dd31a881e1a52c2f..366f415fae01c62546d0ad23279c5cff338663cb 100644 (file)
@@ -54,8 +54,6 @@ static inline const char *ceph_name_type_str(int t)
        }
 }
 
-#define CEPH_MSGR_BACKUP 10  /* backlogged incoming connections */
-
 /* use format string %s%d */
 #define ENTITY_NAME(n)                            \
        ceph_name_type_str(le32_to_cpu((n).type)), \
@@ -127,6 +125,7 @@ struct ceph_msg_pos {
 #define LOSSYTX         0  /* we can close channel or drop messages on errors */
 #define LOSSYRX         1  /* peer may reset/drop messages */
 #define CONNECTING     2
+#define KEEPALIVE_PENDING      3
 #define WRITE_PENDING  4  /* we have data ready to send */
 #define QUEUED          5  /* there is work queued on this connection */
 #define BUSY            6  /* work is being done */
@@ -147,6 +146,8 @@ struct ceph_msg_pos {
  * messages in the case of a TCP disconnect.
  */
 struct ceph_connection {
+       void *private;
+
        struct ceph_messenger *msgr;
        struct socket *sock;
        unsigned long state;    /* connection state (see flags above) */
@@ -168,6 +169,7 @@ struct ceph_connection {
        struct list_head out_queue;
        struct list_head out_sent;   /* sending/sent but unacked */
        u32 out_seq;                 /* last message queued for send */
+       bool out_keepalive_pending;
 
        u32 in_seq, in_seq_acked;  /* last message received, acked */
 
@@ -217,6 +219,13 @@ extern void ceph_msgr_exit(void);
 extern struct ceph_messenger *
 ceph_messenger_create(struct ceph_entity_addr *myaddr);
 extern void ceph_messenger_destroy(struct ceph_messenger *);
+
+extern void ceph_con_init(struct ceph_messenger *msgr,
+                         struct ceph_connection *con,
+                         struct ceph_entity_addr *addr);
+extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
+
+
 extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
                                     struct ceph_entity_addr *addr);