]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Avoid external thread access center
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 19 Sep 2014 03:40:47 +0000 (11:40 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:04:58 +0000 (14:04 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncConnection.h
src/msg/AsyncMessenger.cc

index 4f9b0c2eeb08abcff96bd200af63ff95ed79119a..0c0c0ac256afb396662474268cef477cb0cf0003 100644 (file)
@@ -83,6 +83,17 @@ class C_handle_dispatch : public EventCallback {
   }
 };
 
+class C_handle_stop : public EventCallback {
+  AsyncConnectionRef conn;
+
+ public:
+  C_handle_stop(AsyncConnection *c): conn(c) {}
+  void do_request(int id) {
+    conn->stop();
+  }
+};
+
+
 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 {
   // create a buffer to read into that matches the data alignment
@@ -110,7 +121,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
   : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
     state(STATE_NONE), state_after_send(0), sd(-1),
-    lock("AsyncConnection::lock"), open_write(false),
+    lock("AsyncConnection::lock"), open_write(false), keepalive(false),
     got_bad_auth(false), authorizer(NULL),
     state_buffer(4096), state_offset(0), net(cct), center(c)
 {
@@ -1595,7 +1606,8 @@ void AsyncConnection::_connect()
 
   state = STATE_CONNECTING;
   // rescheduler connection in order to avoid lock dep
-  center->create_time_event(0, read_handler);
+  // may called by external thread(send_message)
+  center->dispatch_event_external(read_handler);
 }
 
 void AsyncConnection::accept(int incoming)
@@ -1622,8 +1634,7 @@ int AsyncConnection::send_message(Message *m)
   if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) {
     _connect();
   } else if (sd > 0 && !open_write) {
-    center->create_file_event(sd, EVENT_WRITABLE, write_handler);
-    open_write = true;
+    center->dispatch_event_external(write_handler);
   }
   return 0;
 }
@@ -1770,6 +1781,12 @@ void AsyncConnection::was_session_reset()
   connect_seq = 0;
 }
 
+void AsyncConnection::mark_down()
+{
+  Mutex::Locker l(lock);
+  center->dispatch_event_external(EventCallbackRef(new C_handle_stop(this)));
+}
+
 // Who call "_stop():
 // 1. receive STATE_OPEN_TAG_CLOSE
 // 2. fault when policy.lossy
@@ -1915,6 +1932,13 @@ void AsyncConnection::handle_ack(uint64_t seq)
   }
 }
 
+void AsyncConnection::send_keepalive()
+{
+  Mutex::Locker l(lock);
+  keepalive = true;
+  center->dispatch_event_external(write_handler);
+}
+
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
 {
   assert(lock.is_locked());
@@ -1948,6 +1972,11 @@ void AsyncConnection::handle_write()
   bufferlist bl;
   int r;
   if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+    if (keepalive) {
+      _send_keepalive_or_ack();
+      keepalive = false;
+    }
+
     if (in_seq > in_seq_acked) {
       ceph_le64 s;
       s = in_seq;
index 1fbb953ae9f4ae2143bc06c419593788bf6762c0..3b352d8ceba90ddfd629a20a4788a247c83952ab 100644 (file)
@@ -85,7 +85,6 @@ class AsyncConnection : public Connection {
     }
     return m;
   }
-
  public:
   AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
   ~AsyncConnection();
@@ -108,23 +107,13 @@ class AsyncConnection : public Connection {
   void accept(int sd);
   int send_message(Message *m);
 
-  void send_keepalive() {
-    Mutex::Locker l(lock);
-    if (state == STATE_OPEN)
-      _send_keepalive_or_ack();
-  }
-  void mark_down() {
-    Mutex::Locker l(lock);
-    _stop();
-  }
+  void send_keepalive();
+  void mark_down();
   void mark_disposable() {
     Mutex::Locker l(lock);
     policy.lossy = true;
   }
 
-  void handle_write();
-  void process();
-
  private:
   enum {
     STATE_NONE,
@@ -220,6 +209,7 @@ class AsyncConnection : public Connection {
   EventCallbackRef write_handler;
   EventCallbackRef reset_handler;
   EventCallbackRef remote_reset_handler;
+  bool keepalive;
 
   // Tis section are temp variables used by state transition
 
@@ -248,6 +238,15 @@ class AsyncConnection : public Connection {
   NetHandler net;
   EventCenter *center;
   ceph::shared_ptr<AuthSessionHandler> session_security;
+
+ public:
+  // used by eventcallback
+  void handle_write();
+  void process();
+  void stop() {
+    Mutex::Locker l(lock);
+    _stop();
+  }
 }; /* AsyncConnection */
 
 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
index ae024298352d5739e9f5f9f944cc71ecd78597d8..4005f76220e52a56bc315efedc0cf99dcebee9d4 100644 (file)
@@ -22,7 +22,7 @@ static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
 }
 
 static ostream& _prefix(std::ostream *_dout, Processor *p) {
-  return *_dout << "-- Processor";
+  return *_dout << " Processor -- ";
 }
 
 static ostream& _prefix(std::ostream *_dout, Worker *w) {
@@ -191,7 +191,8 @@ int Processor::start()
   ldout(msgr->cct, 1) << __func__ << " start" << dendl;
 
   // start thread
-  create();
+  if (listen_sd > 0)
+    create();
 
   return 0;
 }
@@ -466,6 +467,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int
   // create connection
   Worker *w = workers[conn_id % workers.size()];
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+  conn->connect(addr, type);
   w->center.dispatch_event_external(EventCallbackRef(new C_handle_connect(conn, addr, type)));
   assert(!conns.count(addr));
   conns[addr] = conn;