]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Add support for mon library
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 12 Sep 2014 07:53:27 +0000 (15:53 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:04:57 +0000 (14:04 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncConnection.h
src/msg/AsyncMessenger.cc
src/msg/AsyncMessenger.h
src/msg/Event.cc
src/msg/Event.h
src/msg/EventEpoll.cc

index 730cf484918e56ff7274fb97db95e71b40c7ce3d..a1f71de820724ee6dd5adaa596384905f8ad451c 100644 (file)
@@ -56,6 +56,17 @@ class C_handle_reset : public EventCallback {
   }
 };
 
+class C_handle_dispatch : public EventCallback {
+  AsyncMessenger *msgr;
+  Message *m;
+
+ public:
+  C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
+  void do_request(int id) {
+    msgr->ms_deliver_dispatch(m);
+  }
+};
+
 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 {
   // create a buffer to read into that matches the data alignment
@@ -83,8 +94,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
   : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
     state(STATE_NONE), state_after_send(0), sd(-1),
-    lock("AsyncConnection::lock"),
-    got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { }
+    lock("AsyncConnection::lock"), open_write(false),
+    got_bad_auth(false), authorizer(NULL),
+    state_buffer(4096), state_offset(0), net(cct), center(&m->center) { }
 
 AsyncConnection::~AsyncConnection()
 {
@@ -100,13 +112,13 @@ int AsyncConnection::read_bulk(int fd, char *buf, int len)
     if (errno == EAGAIN || errno == EINTR) {
       nread = 0;
     } else {
-      ldout(async_msgr->cct, 1) << __func__ << " Reading from fd %d " << fd
+      ldout(async_msgr->cct, 1) << __func__ << " Reading from fd=" << fd
                           << " : "<< strerror(errno) << dendl;
       return -1;
     }
   } else if (nread == 0) {
-    ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor %d "
-                        << fd << dendl;
+    ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor "
+                              << fd << dendl;
     return -1;
   }
   return nread;
@@ -185,6 +197,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     struct iovec *msgvec = new iovec[size];
     memset(&msg, 0, sizeof(msg));
     msg.msg_iovlen = 0;
+    msg.msg_iov = msgvec;
     int msglen = 0;
     while (size > 0) {
       msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
@@ -199,10 +212,10 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     if (r < 0)
       return r;
 
+    delete msgvec;
     // "r" is the remaining length
     sended += msglen - r;
     if (r > 0) {
-      center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
       ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
                           << " needed to be sent, creating event for writing"
                           << dendl;
@@ -218,8 +231,15 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     bl.swap(outcoming_bl);
   }
 
-  if (!outcoming_bl.length())
+  if (!open_write && is_queued()) {
+    center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+    open_write = true;
+  }
+
+  if (open_write && !is_queued()) {
     center->delete_file_event(sd, EVENT_WRITABLE);
+    open_write = false;
+  }
 
   return outcoming_bl.length();
 }
@@ -258,12 +278,12 @@ int AsyncConnection::read_until(uint64_t needed, bufferptr &p)
 void AsyncConnection::process()
 {
   int r = 0;
-  int prev_state;
+  int prev_state = state;
   Mutex::Locker l(lock);
   do {
-    prev_state = state;
     ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
                                << ", prev state is " << get_state_name(prev_state) << dendl;
+    prev_state = state;
     switch (state) {
       case STATE_OPEN:
         {
@@ -404,11 +424,10 @@ void AsyncConnection::process()
           }
 
           // Reset state
+          data_buf.clear();
           front.clear();
           middle.clear();
           data.clear();
-          memset(&connect_msg, 0, sizeof(connect_msg));
-          memset(&connect_reply, 0, sizeof(connect_reply));
           recv_stamp = ceph_clock_now(async_msgr->cct);
           current_header = header;
           state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
@@ -422,10 +441,10 @@ void AsyncConnection::process()
                                 << policy.throttler_messages->get_current() << "/"
                                 << policy.throttler_messages->get_max() << dendl;
             // FIXME: may block
-            if (policy.throttler_messages->get())
-              state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
+            policy.throttler_messages->get();
           }
 
+          state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
           break;
         }
 
@@ -438,11 +457,11 @@ void AsyncConnection::process()
                   << policy.throttler_bytes->get_current() << "/"
                   << policy.throttler_bytes->get_max() << dendl;
               // FIXME: may block
-              if (policy.throttler_bytes->get(message_size))
-                state = STATE_OPEN_MESSAGE_READ_FRONT;
+              policy.throttler_bytes->get(message_size);
             }
           }
 
+          state = STATE_OPEN_MESSAGE_READ_FRONT;
           break;
         }
 
@@ -493,26 +512,23 @@ void AsyncConnection::process()
           // read data
           uint64_t data_len = le32_to_cpu(current_header.data_len);
           int data_off = le32_to_cpu(current_header.data_off);
-          bufferlist bl;
           if (data_len) {
             // get a buffer
-            lock.Lock();
             map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid);
             if (p != rx_buffers.end()) {
               ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second
                                   << " at offset " << data_off
                                   << " len " << p->second.first.length() << dendl;
-              bl = p->second.first;
+              data_buf = p->second.first;
               // make sure it's big enough
-              if (bl.length() < data_len)
-                bl.push_back(buffer::create(data_len - bl.length()));
-              data_blp = bl.begin();
+              if (data_buf.length() < data_len)
+                data_buf.push_back(buffer::create(data_len - data_buf.length()));
+              data_blp = data_buf.begin();
             } else {
               ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl;
-              alloc_aligned_buffer(bl, data_len, data_off);
-              data_blp = bl.begin();
+              alloc_aligned_buffer(data_buf, data_len, data_off);
+              data_blp = data_buf.begin();
             }
-            lock.Unlock();
           }
 
           msg_left = data_len;
@@ -522,7 +538,7 @@ void AsyncConnection::process()
 
       case STATE_OPEN_MESSAGE_READ_DATA:
         {
-          do {
+          while (msg_left > 0) {
             bufferptr bp = data_blp.get_current_ptr();
             uint64_t read = MIN(bp.length(), msg_left);
             r = read_until(read, bp);
@@ -535,7 +551,8 @@ void AsyncConnection::process()
 
             data_blp.advance(read);
             data.append(bp, 0, read);
-          } while (msg_left > 0);
+            msg_left -= read;
+          }
 
           if (msg_left == 0)
             state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH;
@@ -593,7 +610,7 @@ void AsyncConnection::process()
           //
 
           ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
-          if (auth_handler) {
+          if (auth_handler == NULL) {
             ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl;
           } else {
             if (auth_handler->check_message_signature(message)) {
@@ -638,7 +655,7 @@ void AsyncConnection::process()
           if (async_msgr->ms_can_fast_dispatch(message)) {
             async_msgr->ms_fast_dispatch(message);
           } else {
-            async_msgr->ms_deliver_dispatch(message);
+            center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
           }
 
           state = STATE_OPEN;
@@ -662,13 +679,6 @@ void AsyncConnection::process()
       case STATE_CLOSED:
         {
           ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
-          center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
-          break;
-        }
-
-      case STATE_FAULT:
-        {
-          ldout(async_msgr->cct, 20) << __func__ << " socket is in error" << dendl;
           break;
         }
 
@@ -676,9 +686,12 @@ void AsyncConnection::process()
         {
           if (_process_connection() < 0)
             goto fail;
+          break;
         }
     }
 
+    continue;
+
 fail:
     // clean up state internal variables and states
     if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
@@ -709,7 +722,6 @@ fail:
       }
     }
     fault();
-    state = STATE_FAULT;
   } while (prev_state != state);
 }
 
@@ -766,6 +778,8 @@ int AsyncConnection::_process_connection()
           goto fail;
         }
 
+        ldout(async_msgr->cct, 10) << __func__ << " get banner, ready to send banner" << dendl;
+
         bufferlist bl;
         bl.append(state_buffer.c_str(), strlen(CEPH_BANNER));
         r = _try_send(bl);
@@ -852,30 +866,28 @@ int AsyncConnection::_process_connection()
           delete authorizer;
           authorizer = async_msgr->get_authorizer(peer_type, false);
         }
-        assert(authorizer);
         bufferlist bl;
 
-        ceph_msg_connect connect;
-        connect.features = policy.features_supported;
-        connect.host_type = async_msgr->get_myinst().name.type();
-        connect.global_seq = global_seq;
-        connect.connect_seq = connect_seq;
-        connect.protocol_version = async_msgr->get_proto_version(peer_type, true);
-        connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
-        connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
+        connect_msg.features = policy.features_supported;
+        connect_msg.host_type = async_msgr->get_myinst().name.type();
+        connect_msg.global_seq = global_seq;
+        connect_msg.connect_seq = connect_seq;
+        connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true);
+        connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0;
+        connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0;
         if (authorizer)
-          ldout(async_msgr->cct, 10) << __func__ <<  "connect.authorizer_len="
-              << connect.authorizer_len << " protocol="
-              << connect.authorizer_protocol << dendl;
-        connect.flags = 0;
+          ldout(async_msgr->cct, 10) << __func__ <<  "connect_msg.authorizer_len="
+              << connect_msg.authorizer_len << " protocol="
+              << connect_msg.authorizer_protocol << dendl;
+        connect_msg.flags = 0;
         if (policy.lossy)
-          connect.flags |= CEPH_MSG_CONNECT_LOSSY;  // this is fyi, actually, server decides!
-        bl.append((char*)&connect, sizeof(connect));
+          connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY;  // this is fyi, actually, server decides!
+        bl.append((char*)&connect_msg, sizeof(connect_msg));
         if (authorizer) {
           bl.append(authorizer->bl.c_str(), authorizer->bl.length());
         }
         ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq="
-            << connect_seq << " proto=" << connect.protocol_version << dendl;
+            << connect_seq << " proto=" << connect_msg.protocol_version << dendl;
 
         r = _try_send(bl);
         if (r == 0) {
@@ -932,7 +944,7 @@ int AsyncConnection::_process_connection()
 
           authorizer_reply.push_back(state_buffer);
           bufferlist::iterator iter = authorizer_reply.begin();
-          if (!authorizer->verify_reply(iter)) {
+          if (authorizer && !authorizer->verify_reply(iter)) {
             ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
             goto fail;
           }
@@ -1021,7 +1033,9 @@ int AsyncConnection::_process_connection()
         got_bad_auth = false;
         delete authorizer;
         authorizer = NULL;
-        return 0;
+        memset(&connect_msg, 0, sizeof(connect_msg));
+        memset(&connect_reply, 0, sizeof(connect_reply));
+        break;
       }
 
     case STATE_ACCEPTING:
@@ -1176,6 +1190,7 @@ int AsyncConnection::_process_connection()
       {
         ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
         state = STATE_OPEN;
+        memset(&connect_msg, 0, sizeof(connect_msg));
         break;
       }
 
@@ -1216,7 +1231,6 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
     if (got_bad_auth)
       goto fail;
     got_bad_auth = true;
-    assert(authorizer);
     delete authorizer;
     authorizer = async_msgr->get_authorizer(peer_type, true);  // try harder
     state = STATE_CONNECTING_SEND_CONNECT_MSG;
@@ -1446,11 +1460,11 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     lock.Lock();
   } else {
     lock.Lock();
-    existing->lock.Unlock();
+    existing->lock.Lock();
   }
   if (existing->policy.lossy) {
     // disconnect from the Connection
-    async_msgr->ms_deliver_handle_reset(existing);
+    async_msgr->ms_deliver_handle_reset(existing.get());
   } else {
     // queue a reset on the new connection, which we're dumping for the old
     async_msgr->ms_deliver_handle_reset(this);
@@ -1561,6 +1575,21 @@ void AsyncConnection::accept(int incoming)
   process();
 }
 
+int AsyncConnection::send_message(Message *m)
+{
+  m->get_header().src = async_msgr->get_myname();
+  if (!m->get_priority())
+    m->set_priority(async_msgr->get_default_send_priority());
+
+  Mutex::Locker l(lock);
+  out_q[m->get_priority()].push_back(m);
+  if (sd > 0 && !open_write) {
+    center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+    open_write = true;
+  }
+  return 0;
+}
+
 void AsyncConnection::requeue_sent()
 {
   if (sent.empty())
@@ -1649,6 +1678,7 @@ void AsyncConnection::fault()
 
   shutdown_socket();
   center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+  open_write = false;
 
   // requeue sent items
   requeue_sent();
@@ -1714,6 +1744,8 @@ void AsyncConnection::_stop()
   shutdown_socket();
   discard_out_queue();
   outcoming_bl.clear();
+  center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+  open_write = false;
   state = STATE_CLOSED;
 }
 
@@ -1865,6 +1897,7 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
     bl.append(CEPH_MSGR_TAG_KEEPALIVE);
   }
 
+  ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl;
   _try_send(bl, false);
 }
 
@@ -1873,30 +1906,48 @@ void AsyncConnection::handle_write()
   ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
   Mutex::Locker l(lock);
   bufferlist bl;
-  if (in_seq > in_seq_acked) {
-    ceph_le64 s;
-    s = in_seq;
-    bl.append(CEPH_MSGR_TAG_ACK);
-    bl.append((char*)&s, sizeof(s));
-  }
+  int r;
+  if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+    if (in_seq > in_seq_acked) {
+      ceph_le64 s;
+      s = in_seq;
+      bl.append(CEPH_MSGR_TAG_ACK);
+      bl.append((char*)&s, sizeof(s));
+      ldout(async_msgr->cct, 10) << __func__ << " try send msg ack" << dendl;
+    }
 
-  int r = _try_send(bl);
-  if (r < 0)
-    goto fail;
-  else if (r > 0)
-    return ;
+    r = _try_send(bl);
+    if (r < 0) {
+      ldout(async_msgr->cct, 1) << __func__ << " send msg ack failed :"
+                                << strerror(errno) << dendl;
+      goto fail;
+    } else if (r > 0) {
+      return ;
+    }
 
-  while (1) {
-    Message *m = _get_next_outgoing();
-    if (!m)
-      break;
+    while (1) {
+      Message *m = _get_next_outgoing();
+      if (!m)
+        break;
 
-    assert(m);
-    r = _send(m);
-    if (r < 0)
+      assert(m);
+      ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
+      r = _send(m);
+      if (r < 0) {
+        ldout(async_msgr->cct, 1) << __func__ << " send msg failed :"
+                                  << strerror(errno) << dendl;
+        goto fail;
+      } else if (r > 0) {
+        break;
+      }
+    }
+  } else {
+    r = _try_send(bl);
+    if (r < 0) {
+      ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed :"
+                                << strerror(errno) << dendl;
       goto fail;
-    else if (r > 0)
-      break;
+    }
   }
 
   return ;
index a010cb0912bc36c7d4622d2acc49d0117e96e5dd..dfc28c8c62fa74a86f26f4e67d11312fe7d7af3f 100644 (file)
@@ -93,8 +93,8 @@ class AsyncConnection : public Connection {
   ostream& _conn_prefix(std::ostream *_dout);
 
   bool is_connected() {
-    Mutex::Locker l(lock);
-    return state != STATE_CLOSED;
+    // FIXME?
+    return true;
   }
 
   // Only call when AsyncConnection first construct
@@ -106,15 +106,12 @@ class AsyncConnection : public Connection {
   }
   // Only call when AsyncConnection first construct
   void accept(int sd);
-  int send_message(Message *m) {
-    Mutex::Locker l(lock);
-    out_q[m->get_priority()].push_back(m);
-    return 0;
-  }
+  int send_message(Message *m);
 
   void send_keepalive() {
     Mutex::Locker l(lock);
-    _send_keepalive_or_ack();
+    if (state == STATE_OPEN)
+      _send_keepalive_or_ack();
   }
   void mark_down() {
     Mutex::Locker l(lock);
@@ -163,7 +160,6 @@ class AsyncConnection : public Connection {
     STATE_STANDBY,
     STATE_CLOSED,
     STATE_WAIT,       // just wait for racing connection
-    STATE_FAULT
   };
 
   static const char *get_state_name(int state) {
@@ -219,6 +215,7 @@ class AsyncConnection : public Connection {
   list<Message*> sent;
   Mutex lock;
   utime_t backoff;         // backoff time
+  bool open_write;
 
   // Tis section are temp variables used by state transition
 
@@ -227,13 +224,14 @@ class AsyncConnection : public Connection {
   utime_t throttle_stamp;
   uint64_t msg_left;
   ceph_msg_header current_header;
+  bufferlist data_buf;
   bufferlist::iterator data_blp;
   bufferlist front, middle, data;
   ceph_msg_connect connect_msg;
-  ceph_msg_connect_reply connect_reply;
   // Connecting state
   bool got_bad_auth;
   AuthAuthorizer *authorizer;
+  ceph_msg_connect_reply connect_reply;
   // Accepting state
   entity_addr_t socket_addr;
   CryptoKey session_key;
index b1f69f447a304af3cd28c3d28d97b1139eeb3735..abd27622ecc10102892ad2457a4a1fd678d4fe82 100644 (file)
@@ -186,7 +186,8 @@ int Processor::start()
 
   // start thread
   create();
-  center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
+  if (listen_sd >= 0)
+    center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
 
   return 0;
 }
@@ -216,7 +217,7 @@ void *Processor::entry()
   while (!done) {
     ldout(msgr->cct,20) << __func__ << " calling poll" << dendl;
 
-    r = center->process_events(30000);
+    r = center->process_events(1000);
     if (r < 0) {
       ldout(msgr->cct,20) << __func__ << " process events failed: "
                           << cpp_strerror(errno) << dendl;
@@ -239,7 +240,8 @@ void Processor::stop()
   done = true;
   ldout(msgr->cct, 10) << __func__ << " processor" << dendl;
 
-  center->delete_file_event(listen_sd, EVENT_READABLE);
+  if (listen_sd >= 0)
+    center->delete_file_event(listen_sd, EVENT_READABLE);
   if (listen_sd >= 0) {
     ::shutdown(listen_sd, SHUT_RDWR);
   }
@@ -269,12 +271,13 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     lock("AsyncMessenger::lock"),
     nonce(_nonce), did_bind(false),
     global_seq(0),
-    cluster_protocol(0),
+    cluster_protocol(0), stopped(true),
     local_connection(new AsyncConnection(cct, this)),
     center(cct)
 {
   ceph_spin_init(&global_seq_lock);
   init_local_connection();
+  center.init(5000);
 }
 
 /**
@@ -291,18 +294,20 @@ void AsyncMessenger::ready()
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
 
   lock.Lock();
-  if (did_bind)
-    processor.start();
+  processor.start();
   lock.Unlock();
 }
 
 int AsyncMessenger::shutdown()
 {
   ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl;
+  center.stop();
   mark_down_all();
 
   // break ref cycles on the loopback connection
   local_connection->set_priv(NULL);
+  stop_cond.Signal();
+  stopped = true;
   return 0;
 }
 
@@ -330,6 +335,7 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
 {
   ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
   assert(did_bind);
+  center.stop();
   processor.stop();
   mark_down_all();
   return processor.rebind(avoid_ports);
@@ -345,6 +351,7 @@ int AsyncMessenger::start()
 
   assert(!started);
   started = true;
+  stopped = false;
 
   if (!did_bind) {
     my_inst.addr.nonce = nonce;
@@ -353,8 +360,7 @@ int AsyncMessenger::start()
 
   lock.Unlock();
 
-  // FIXME
-  center.init(5000);
+  center.start();
   return 0;
 }
 
@@ -365,17 +371,15 @@ void AsyncMessenger::wait()
     lock.Unlock();
     return;
   }
+  if (!stopped)
+    stop_cond.Wait(lock);
   lock.Unlock();
 
   // done!  clean up.
-  if (did_bind) {
-    ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
-    processor.stop();
-    did_bind = false;
-    ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;
-  }
-
-  center.stop();
+  ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
+  processor.stop();
+  did_bind = false;
+  ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;
 
   // close all pipes
   lock.Lock();
@@ -447,12 +451,6 @@ ConnectionRef AsyncMessenger::get_loopback_connection()
 
 int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
 {
-  // set envelope
-  m->get_header().src = get_myname();
-
-  if (!m->get_priority())
-    m->set_priority(get_default_send_priority());
-
   ldout(cct, 1) << __func__ << "--> " << dest.name << " "
                 << dest.addr << " -- " << *m << " -- ?+"
                 << m->get_data().length() << " " << m << dendl;
index efe811f52c561316e907e904e046f1bc652b952b..751d87fdcc538e2761473447fa04f7b3af309db5 100644 (file)
@@ -267,6 +267,9 @@ private:
   /// internal cluster protocol version, if any, for talking to entities of the same type.
   int cluster_protocol;
 
+  Cond  stop_cond;
+  bool stopped;
+
   AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
     assert(lock.is_locked());
     ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
@@ -274,7 +277,6 @@ private:
       return NULL;
     if (!p->second->is_connected()) {
       // FIXME
-      p->second->put();
       conns.erase(p);
       return NULL;
     }
@@ -318,8 +320,6 @@ public:
 
   void accept_conn(AsyncConnectionRef conn) {
     Mutex::Locker l(lock);
-    if (conns.count(conn->get_peer_addr()))
-      delete conns[conn->get_peer_addr()];
     conns[conn->peer_addr] = conn;
     accepting_conns.erase(conn);
   }
index 4d300a6a8871655774f3562c149df457d18dabc9..3c3a22ffb7069ad8102715419b0ede9104454e55 100644 (file)
@@ -46,7 +46,6 @@ int EventCenter::init(int n)
   }
 
   nevent = n;
-  event_tp.start();
   return 0;
 }
 
@@ -93,6 +92,8 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
       delete event->write_cb;
     event->write_cb = ctxt;
   }
+  ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask
+                 << " now mask is " << event->mask << dendl;
   return 0;
 }
 
@@ -101,26 +102,43 @@ void EventCenter::delete_file_event(int fd, int mask)
   Mutex::Locker l(lock);
 
   EventCenter::FileEvent *event = _get_file_event(fd);
-  driver->del_event(fd, event ? event->mask: EVENT_NONE, mask);
-  if (!event) {
-    file_events[fd] = EventCenter::FileEvent();
-    event = &file_events[fd];
-  }
+  if (!event)
+    return ;
+
+  driver->del_event(fd, event->mask, mask);
 
-  if (event->read_cb)
+  if (mask & EVENT_READABLE && event->read_cb) {
     delete event->read_cb;
-  if (event->write_cb)
+    event->read_cb = NULL;
+  }
+  if (mask & EVENT_WRITABLE && event->write_cb) {
     delete event->write_cb;
+    event->write_cb = NULL;
+  }
 
   event->mask = event->mask & (~mask);
   if (event->mask == EVENT_NONE)
     file_events.erase(fd);
+  ldout(cct, 10) << __func__ << " delete fd=" << fd << " mask=" << mask
+                 << " now mask is " << event->mask << dendl;
 }
 
 uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt)
 {
   Mutex::Locker l(lock);
   uint64_t id = time_event_next_id++;
+
+  ldout(cct, 10) << __func__ << " id=" << id << " expire time=" << milliseconds << dendl;
+  // Direct dispatch
+  if (milliseconds == 0) {
+    FiredEvent e;
+    e.time_event.id = id;
+    e.time_event.time_cb = ctxt;
+    e.is_file = false;
+    event_wq.queue(e);
+    return id;
+  }
+
   EventCenter::TimeEvent event;
   utime_t expire;
   struct timeval tv;
@@ -135,7 +153,6 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ct
   event.time_cb = ctxt;
   time_to_ids[expire] = id;
   time_events[id] = event;
-  ldout(cct, 10) << __func__ << " id=" << id << " trigger time is " << expire << dendl;
 
   return id;
 }
@@ -148,11 +165,19 @@ void EventCenter::delete_time_event(uint64_t id)
     if (it->second == id) {
       time_to_ids.erase(it);
       time_events.erase(id);
+      ldout(cct, 10) << __func__ << " id=" << id << dendl;
       return ;
     }
   }
 }
 
+void EventCenter::start()
+{
+  ldout(cct, 1) << __func__ << dendl;
+  Mutex::Locker l(lock);
+  event_tp.start();
+}
+
 void EventCenter::stop()
 {
   ldout(cct, 1) << __func__ << dendl;
@@ -226,6 +251,11 @@ int EventCenter::process_events(int timeout_millionseconds)
 
   {
     Mutex::Locker l(lock);
+    for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+          it != time_to_ids.end(); ++it) {
+      ldout(cct, 10) << __func__ << " time_to_ids " << it->first << " id=" << it->second << dendl;
+    }
+
     map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
     if (it != time_to_ids.end() && shortest > it->first) {
       ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
@@ -247,6 +277,7 @@ int EventCenter::process_events(int timeout_millionseconds)
     e.file_event = fired_events[j];
     e.is_file = true;
     event_wq.queue(e);
+    ldout(cct, 10) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
   }
 
   if (trigger_time)
index d4127966484b3aeca92779fd8821907a574586ce..19da2a28455100de930227cfbd4f50475a9c9478 100644 (file)
@@ -82,7 +82,7 @@ class EventCenter {
     uint64_t id;
     EventCallback *time_cb;
 
-    TimeEvent(): id(0), fd(0), time_cb(NULL) {}
+    TimeEvent(): id(0), time_cb(NULL) {}
   };
 
   Mutex lock;
@@ -195,6 +195,7 @@ class EventCenter {
   void delete_file_event(int fd, int mask);
   void delete_time_event(uint64_t id);
   int process_events(int timeout_milliseconds);
+  void start();
   void stop();
   FileEvent *get_file_event(int fd) {
     Mutex::Locker l(lock);
index 865cf012d1b3cd5889001e4a27a631e454bcbb2a..6d5ec5964bc1fb7849b606e718232a553e8a6ca5 100644 (file)
@@ -39,9 +39,10 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
       pos = deleted_fds.front();
       deleted_fds.pop_front();
     } else {
-      fds[fd] = pos = next_pos;
+      pos = next_pos;
       next_pos++;
     }
+    fds[fd] = pos;
   } else {
     op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD;
   }
@@ -59,6 +60,9 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
                        << cpp_strerror(errno) << dendl;
     return -errno;
   }
+
+  ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << add_mask
+                 << dendl;
   return 0;
 }
 
@@ -77,11 +81,17 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
   ee.data.u64 = 0; /* avoid valgrind warning */
   ee.data.fd = fd;
   if (mask != EVENT_NONE) {
-    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee);
+    if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee) < 0) {
+      lderr(cct) << __func__ << " epoll_ctl: modify fd=" << fd << " mask=" << mask
+                 << " failed." << cpp_strerror(errno) << dendl;
+    }
   } else {
     /* Note, Kernel < 2.6.9 requires a non null event pointer even for
      * EPOLL_CTL_DEL. */
-    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ee);
+    if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ee) < 0) {
+      lderr(cct) << __func__ << " epoll_ctl: delete fd=" << fd
+                 << " failed." << cpp_strerror(errno) << dendl;
+    }
 
     if (next_pos == it->second)
       next_pos--;
@@ -89,6 +99,8 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
       deleted_fds.push_back(it->second);
     fds.erase(fd);
   }
+  ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << mask
+                 << dendl;
 }
 
 int EpollDriver::resize_events(int newsize)