]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: more messenger cleanups, comments
authorSage Weil <sage@newdream.net>
Sat, 18 Oct 2008 04:12:52 +0000 (21:12 -0700)
committerSage Weil <sage@newdream.net>
Sat, 18 Oct 2008 04:12:52 +0000 (21:12 -0700)
src/kernel/caps.c
src/kernel/messenger.c
src/msg/SimpleMessenger.cc

index 3b0cd19a18dd50dc9d2c63311a77c198393537ad..532157e6414c5c69a281422170806d92a34c951d 100644 (file)
@@ -521,7 +521,7 @@ retry:
 void ceph_flush_snaps(struct ceph_inode_info *ci)
 {
        struct inode *inode = &ci->vfs_inode;
-       
+
        spin_lock(&inode->i_lock);
        __ceph_flush_snaps(ci);
        spin_unlock(&inode->i_lock);
index 21e97dd8626e7eae032e7a3b6e0dd4c4db0b8e7a..1e7044e489773b809953e04bdf86355e1ccf0063 100644 (file)
@@ -428,7 +428,7 @@ static int __register_connection(struct ceph_messenger *msgr,
             atomic_read(&con->nref), atomic_read(&con->nref) + 1);
        atomic_inc(&con->nref);
 
-       /* if were just ACCEPTING this connection, it is already on the 
+       /* if were just ACCEPTING this connection, it is already on the
         * con_all and con_accepting lists. */
        if (test_and_clear_bit(ACCEPTING, &con->state)) {
                list_del_init(&con->list_bucket);
@@ -656,7 +656,7 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_msg->footer.data_crc = 0;
 
        /* is there a data payload? */
-       if (le32_to_cpu(m->hdr.data_len) == 0) {
+       if (le32_to_cpu(m->hdr.data_len) > 0) {
                /* initialize page iterator */
                con->out_msg_pos.page = 0;
                con->out_msg_pos.page_pos =
@@ -666,7 +666,7 @@ static void prepare_write_message(struct ceph_connection *con)
                con->out_more = 1;  /* data + footer will follow */
        } else {
                /* no, queue up footer too and be done */
-               prepare_write_message_footer(con, v);   
+               prepare_write_message_footer(con, v);
        }
 
        set_bit(WRITE_PENDING, &con->state);
@@ -912,7 +912,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                                      con->out_msg_pos.page_pos, len,
                                      MSG_DONTWAIT | MSG_NOSIGNAL |
                                      MSG_MORE);
-               
+
                if (crc && msg->pages)
                        kunmap(page);
 
@@ -936,7 +936,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                con->out_msg->footer.flags |=
                        cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
        con->out_kvec_bytes = 0;
-       con->out_kvec_left = 0; 
+       con->out_kvec_left = 0;
        con->out_kvec_cur = con->out_kvec;
        prepare_write_message_footer(con, 0);
        ret = 1;
@@ -1809,9 +1809,23 @@ bad_tag:
  * Atomically queue work on a connection.  Bump @con reference to
  * avoid races with connection teardown.
  *
- * There is some trickery going on with QUEUED, BUSY, and BACKOFF:
+ * There is some trickery going on with QUEUED, BUSY, and BACKOFF because
+ * we only want a _single_ thread operating on each connection at any point
+ * in time, but we want to use all available CPUs.
+ *
+ * The worker thread only proceeds if it can atomically set BUSY.  It
+ * clears QUEUED and does it's thing.  When it thinks it's done, it
+ * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
+ * (tries again to set BUSY).
  *
- * 
+ * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
+ * try to queue work.  If that fails (work is already queued, or BUSY)
+ * we give up (work also already being done or is queued) but leave QUEUED
+ * set so that the worker thread will loop if necessary.
+ *
+ * BACKOFF is set by the fault error path to prevent the worker thread from
+ * looping or socket events from requeuing work; instead, we schedule delayed
+ * work explicitly.
  */
 static void ceph_queue_con(struct ceph_connection *con)
 {
@@ -1855,7 +1869,7 @@ more:
                dout(5, "con_work CLOSED\n");
                goto done;
        }
-       if (test_bit(WAIT, &con->state)) {
+       if (test_bit(WAIT, &con->state)) {   /* we are a zombie */
                dout(5, "con_work WAIT\n");
                goto done;
        }
@@ -1865,7 +1879,7 @@ more:
        if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
            try_read(con) < 0 ||
            try_write(con) < 0)
-               ceph_fault(con);
+               ceph_fault(con);     /* error/fault path */
 
 done:
        clear_bit(BUSY, &con->state);
@@ -1885,8 +1899,8 @@ out:
 
 
 /*
- * failure case
- * A retry mechanism is used with exponential backoff
+ * Generic error/fault handler.  A retry mechanism is used with
+ * exponential backoff
  */
 static void ceph_fault(struct ceph_connection *con)
 {
@@ -1903,14 +1917,8 @@ static void ceph_fault(struct ceph_connection *con)
 
        con_close_socket(con);
 
-       /* hmm? */
-       BUG_ON(test_bit(WAIT, &con->state));
-
-       /*
-        * If there are no messages in the queue, place the
-        * connection in a STANDBY state.  otherwise, retry with
-        * delay
-        */
+       /* If there are no messages in the queue, place the connection
+        * in a STANDBY state (i.e., don't try to reconnect just yet). */
        spin_lock(&con->out_queue_lock);
        if (list_empty(&con->out_queue)) {
                dout(10, "fault setting STANDBY\n");
@@ -1919,65 +1927,69 @@ static void ceph_fault(struct ceph_connection *con)
                return;
        }
 
+       /* Requeue anything that hasn't been acked, and retry after a
+        * delay. */
+       list_splice_init(&con->out_sent, &con->out_queue);
+       spin_unlock(&con->out_queue_lock);
+
+       /* set BACKOFF so that caller does not loop and to prevent
+        * anything from requeuing work on this connection. */
        dout(10, "fault setting BACKOFF\n");
        set_bit(BACKOFF, &con->state);
+       clear_bit(BUSY, &con->state);  /* to avoid an improbable race */
 
        if (con->delay == 0)
                con->delay = BASE_DELAY_INTERVAL;
        else if (con->delay < MAX_DELAY_INTERVAL)
                con->delay *= 2;
 
-       atomic_inc(&con->nref);
+       /* explicitly schedule work to try to reconnect again later. */
        dout(40, "fault queueing %p %d -> %d delay %lu\n", con,
-            atomic_read(&con->nref) - 1, atomic_read(&con->nref),
+            atomic_read(&con->nref), atomic_read(&con->nref) + 1,
             con->delay);
+       atomic_inc(&con->nref);
        queue_delayed_work(ceph_msgr_wq, &con->work,
                           round_jiffies_relative(con->delay));
-
-       list_splice_init(&con->out_sent, &con->out_queue);
-       spin_unlock(&con->out_queue_lock);
 }
 
 
 /*
- *  worker function when listener receives a connect
+ * Handle an incoming connection.
  */
-static void try_accept(struct work_struct *work)
+static void accept_work(struct work_struct *work)
 {
        struct ceph_connection *newcon = NULL;
-       struct ceph_messenger *msgr;
-
-       msgr = container_of(work, struct ceph_messenger, awork);
-
-       dout(5, "Entered try_accept\n");
+       struct ceph_messenger *msgr = container_of(work, struct ceph_messenger,
+                                                  awork);
 
        /* initialize the msgr connection */
        newcon = new_connection(msgr);
        if (newcon == NULL) {
                derr(1, "kmalloc failure accepting new connection\n");
-               goto done;
+               return;
        }
 
-       newcon->connect_seq = 1;
        set_bit(ACCEPTING, &newcon->state);
+       newcon->connect_seq = 1;
        newcon->in_tag = CEPH_MSGR_TAG_READY;  /* eventually, hopefully */
 
        if (ceph_tcp_accept(msgr->listen_sock, newcon) < 0) {
                derr(1, "error accepting connection\n");
                put_connection(newcon);
-               goto done;
+               return;
        }
        dout(5, "accepted connection \n");
 
        prepare_write_accept_hello(msgr, newcon);
        add_connection_accepting(msgr, newcon);
 
-       /* hand off to work queue; we may have missed socket state change */
+       /* queue work explicitly; we may have missed the socket state
+        * change before setting the socket callbacks. */
        ceph_queue_con(newcon);
-done:
-       return;
 }
 
+
+
 /*
  * create a new messenger instance, creates listening socket
  */
@@ -1989,13 +2001,16 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
        if (msgr == NULL)
                return ERR_PTR(-ENOMEM);
-       INIT_WORK(&msgr->awork, try_accept);
+
+       INIT_WORK(&msgr->awork, accept_work);
        spin_lock_init(&msgr->con_lock);
        INIT_LIST_HEAD(&msgr->con_all);
        INIT_LIST_HEAD(&msgr->con_accepting);
        INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC);
        spin_lock_init(&msgr->global_seq_lock);
 
+       /* the zero page is needed if a request is "canceled" while the message
+        * is being written over the socket */
        msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
        if (!msgr->zero_page) {
                kfree(msgr);
@@ -2024,7 +2039,6 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
 
        dout(1, "messenger %p listening on %u.%u.%u.%u:%u\n", msgr,
             IPQUADPORT(msgr->inst.addr.ipaddr));
-
        return msgr;
 }
 
@@ -2051,7 +2065,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
                     atomic_read(&con->nref) - 1, atomic_read(&con->nref));
                __remove_connection(msgr, con);
 
-               /* in case there's queued work... */
+               /* in case there's queued work.  drop a reference if
+                * we successfully cancel work. */
                spin_unlock(&msgr->con_lock);
                if (cancel_delayed_work_sync(&con->work))
                        put_connection(con);
@@ -2066,12 +2081,11 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
        __free_page(msgr->zero_page);
 
        kfree(msgr);
-
        dout(10, "destroyed messenger %p\n", msgr);
 }
 
 /*
- * mark a peer down.  drop any open connection.
+ * mark a peer down.  drop any open connections.
  */
 void ceph_messenger_mark_down(struct ceph_messenger *msgr,
                              struct ceph_entity_addr *addr)
@@ -2097,9 +2111,12 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr,
 
 
 /*
- * a single ceph_msg can't be queued for send twice, unless it's
- * already been delivered (i.e. we have the only remaining reference).
- * so, dup the message if there is more than once reference.
+ * A single ceph_msg can't be queued for send twice, unless it's
+ * already been delivered (i.e. we have the only remaining reference),
+ * because of the list_head indicating which queue it is on.
+ *
+ * So, we dup the message if there is more than once reference.  If it has
+ * pages (a data payload), steal the pages away from the old message.
  */
 struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
 {
@@ -2113,7 +2130,8 @@ struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
                           le32_to_cpu(old->hdr.data_len),
                           le32_to_cpu(old->hdr.data_off),
                           old->pages);
-       BUG_ON(!dup);
+       if (!dup)
+               return ERR_PTR(-ENOMEM);
        memcpy(dup->front.iov_base, old->front.iov_base,
               le32_to_cpu(old->hdr.front_len));
 
@@ -2129,9 +2147,9 @@ struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
 
 
 /*
- * queue up an outgoing message.
+ * Queue up an outgoing message.
  *
- * this consumes a msg reference.  that is, if the caller wants to
+ * This consumes a msg reference.  That is, if the caller wants to
  * keep @msg around, it had better call ceph_msg_get first.
  */
 int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
@@ -2150,13 +2168,17 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        if (!con) {
                /* drop lock while we allocate a new connection */
                spin_unlock(&msgr->con_lock);
+
                newcon = new_connection(msgr);
                if (IS_ERR(newcon))
                        return PTR_ERR(con);
 
                newcon->out_connect.flags = 0;
-               if (!timeout)
+               if (!timeout) {
+                       dout(10, "ceph_msg_send setting LOSSYTX\n");
                        newcon->out_connect.flags |= CEPH_MSG_CONNECT_LOSSYTX;
+                       set_bit(LOSSYTX, &newcon->state);
+               }
 
                ret = radix_tree_preload(GFP_NOFS);
                if (ret < 0) {
@@ -2189,18 +2211,15 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                spin_unlock(&msgr->con_lock);
        }
 
-       if (!timeout) {
-               dout(10, "ceph_msg_send setting LOSSYTX\n");
-               set_bit(LOSSYTX, &con->state);
-       }
-
        /* queue */
        spin_lock(&con->out_queue_lock);
+
+       /* avoid queuing multiple PING messages in a row. */
        if (unlikely(le16_to_cpu(msg->hdr.type) == CEPH_MSG_PING &&
                     !list_empty(&con->out_queue) &&
-                    le16_to_cpu(list_entry(con->out_queue.prev, struct ceph_msg,
-                               list_head)->hdr.type) == CEPH_MSG_PING)) {
-               /* don't queue multiple pings in a row */
+                    le16_to_cpu(list_entry(con->out_queue.prev,
+                                           struct ceph_msg,
+                                   list_head)->hdr.type) == CEPH_MSG_PING)) {
                dout(2, "ceph_msg_send dropping dup ping\n");
                ceph_msg_put(msg);
        } else {
@@ -2218,6 +2237,8 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        }
        spin_unlock(&con->out_queue_lock);
 
+       /* if there wasn't anything waiting to send before, queue
+        * new work */
        if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                ceph_queue_con(con);
 
@@ -2267,7 +2288,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        m->nr_pages = calc_pages_for(page_off, page_len);
        m->pages = pages;
 
-       dout(20, "ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, m->nr_pages);
+       dout(20, "ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
+            m->nr_pages);
        return m;
 
 out2:
@@ -2282,9 +2304,10 @@ void ceph_msg_put(struct ceph_msg *m)
        dout(20, "ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
             atomic_read(&m->nref)-1);
        if (atomic_read(&m->nref) <= 0) {
-               derr(0, "bad ceph_msg_put on %p %llu from %s%d %d=%s len %d+%d\n",
+               derr(0, "bad ceph_msg_put on %p %llu %s%d->%s%d %d=%s %d+%d\n",
                     m, le64_to_cpu(m->hdr.seq),
                     ENTITY_NAME(m->hdr.src.name),
+                    ENTITY_NAME(m->hdr.dst.name),
                     le16_to_cpu(m->hdr.type),
                     ceph_msg_type_name(le16_to_cpu(m->hdr.type)),
                     le32_to_cpu(m->hdr.front_len),
index be9f23e6dd1fe6e3f06ccffba8f8b460960369d2..06a28135a64fb6a859e3e140da6a12ff0afd1ca1 100644 (file)
@@ -794,7 +794,7 @@ int Rank::Pipe::accept()
   dout(10) << "accept sd=" << sd << dendl;
   
   // identify peer
-  char banner[strlen(CEPH_BANNER)];
+  char banner[strlen(CEPH_BANNER)+1];
   rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
   if (rc < 0) {
     dout(10) << "accept couldn't read banner" << dendl;
@@ -802,7 +802,8 @@ int Rank::Pipe::accept()
     return -1;
   }
   if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
-    dout(10) << "accept peer sent bad banner" << dendl;
+    banner[strlen(CEPH_BANNER)] = 0;
+    dout(10) << "accept peer sent bad banner '" << banner << "'" << dendl;
     state = STATE_CLOSED;
     return -1;
   }