]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: goto fail_unlocked on early failures in accept()
authorSage Weil <sage@inktank.com>
Mon, 17 Jun 2013 21:14:02 +0000 (14:14 -0700)
committerSage Weil <sage@inktank.com>
Wed, 24 Jul 2013 23:20:35 +0000 (16:20 -0700)
Instead of duplicating an incomplete cleanup sequence (that does not
clear_pipe()), goto fail_unlocked and do the cleanup in a generic way.
s/rc/r/ while we are here.

Signed-off-by: Sage Weil <sage@inktank.com>
(cherry picked from commit ec612a5bda119cea52bbac9b2a49ecf1e83b08e5)

src/msg/Pipe.cc

index 2a42b97d92d9ff73fae373bb5e99936919f955ca..bb8d7bdb427ba95cb1639c67989e13bc86af1e92 100644 (file)
@@ -217,69 +217,78 @@ int Pipe::accept()
   // my creater gave me sd via accept()
   assert(state == STATE_ACCEPTING);
   
+  // vars
+  bufferlist addrs;
+  entity_addr_t socket_addr;
+  socklen_t len;
+  int r;
+  char banner[strlen(CEPH_BANNER)+1];
+  bufferlist addrbl;
+  ceph_msg_connect connect;
+  ceph_msg_connect_reply reply;
+  Pipe *existing = 0;
+  bufferptr bp;
+  bufferlist authorizer, authorizer_reply;
+  bool authorizer_valid;
+  uint64_t feat_missing;
+  bool replaced = false;
+  CryptoKey session_key;
+
+  // this should roughly mirror pseudocode at
+  //  http://ceph.newdream.net/wiki/Messaging_protocol
+  int reply_tag = 0;
+  uint64_t existing_seq = -1;
+
+  // used for reading in the remote acked seq on connect
+  uint64_t newly_acked_seq = 0;
+
   // announce myself.
-  int rc = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
-  if (rc < 0) {
+  r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
+  if (r < 0) {
     ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
-    state = STATE_CLOSED;
-    state_closed.set(1);
-    return -1;
+    goto fail_unlocked;
   }
 
   // and my addr
-  bufferlist addrs;
   ::encode(msgr->my_inst.addr, addrs);
 
   port = msgr->my_inst.addr.get_port();
 
   // and peer's socket addr (they might not know their ip)
-  entity_addr_t socket_addr;
-  socklen_t len = sizeof(socket_addr.ss_addr());
-  int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
+  len = sizeof(socket_addr.ss_addr());
+  r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
   if (r < 0) {
     char buf[80];
     ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
-    state = STATE_CLOSED;
-    state_closed.set(1);
-    return -1;
+    goto fail_unlocked;
   }
   ::encode(socket_addr, addrs);
 
-  rc = tcp_write(addrs.c_str(), addrs.length());
-  if (rc < 0) {
+  r = tcp_write(addrs.c_str(), addrs.length());
+  if (r < 0) {
     ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
-    state = STATE_CLOSED;
-    state_closed.set(1);
-    return -1;
+    goto fail_unlocked;
   }
 
   ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
   
   // identify peer
-  char banner[strlen(CEPH_BANNER)+1];
   if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
     ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
-    state = STATE_CLOSED;
-    state_closed.set(1);
-    return -1;
+    goto fail_unlocked;
   }
   if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
     banner[strlen(CEPH_BANNER)] = 0;
     ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
-    state = STATE_CLOSED;
-    state_closed.set(1);
-    return -1;
+    goto fail_unlocked;
   }
-  bufferlist addrbl;
   {
     bufferptr tp(sizeof(peer_addr));
     addrbl.push_back(tp);
   }
   if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
     ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
-    state = STATE_CLOSED;
-    state_closed.set(1);
-    return -1;
+    goto fail_unlocked;
   }
   {
     bufferlist::iterator ti = addrbl.begin();
@@ -297,24 +306,6 @@ int Pipe::accept()
   }
   set_peer_addr(peer_addr);  // so that connection_state gets set up
   
-  ceph_msg_connect connect;
-  ceph_msg_connect_reply reply;
-  Pipe *existing = 0;
-  bufferptr bp;
-  bufferlist authorizer, authorizer_reply;
-  bool authorizer_valid;
-  uint64_t feat_missing;
-  bool replaced = false;
-  CryptoKey session_key;
-
-  // this should roughly mirror pseudocode at
-  //  http://ceph.newdream.net/wiki/Messaging_protocol
-  int reply_tag = 0;
-  uint64_t existing_seq = -1;
-
-  // used for reading in the remote acked seq on connect
-  uint64_t newly_acked_seq = 0;
-
   while (1) {
     if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
       ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
@@ -545,12 +536,12 @@ int Pipe::accept()
   reply:
     reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
     reply.authorizer_len = authorizer_reply.length();
-    rc = tcp_write((char*)&reply, sizeof(reply));
-    if (rc < 0)
+    r = tcp_write((char*)&reply, sizeof(reply));
+    if (r < 0)
       goto fail_unlocked;
     if (reply.authorizer_len) {
-      rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
-      if (rc < 0)
+      r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+      if (r < 0)
        goto fail_unlocked;
     }
   }
@@ -630,20 +621,20 @@ int Pipe::accept()
   register_pipe();
   msgr->lock.Unlock();
 
-  rc = tcp_write((char*)&reply, sizeof(reply));
-  if (rc < 0) {
+  r = tcp_write((char*)&reply, sizeof(reply));
+  if (r < 0) {
     goto fail_registered;
   }
 
   if (reply.authorizer_len) {
-    rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
-    if (rc < 0) {
+    r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+    if (r < 0) {
       goto fail_registered;
     }
   }
 
   if (reply_tag == CEPH_MSGR_TAG_SEQ) {
-    if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
+    if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
       ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
       goto fail_registered;
     }