From: Haomai Wang Date: Fri, 19 Sep 2014 03:40:47 +0000 (+0800) Subject: AsyncConnection: Avoid external thread access center X-Git-Tag: v0.88~37^2~4^2~21 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cc0554a55a4bd7b2f71e2f33cc4834cc0c578e52;p=ceph.git AsyncConnection: Avoid external thread access center Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index 4f9b0c2eeb08..0c0c0ac256af 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -83,6 +83,17 @@ class C_handle_dispatch : public EventCallback { } }; +class C_handle_stop : public EventCallback { + AsyncConnectionRef conn; + + public: + C_handle_stop(AsyncConnection *c): conn(c) {} + void do_request(int id) { + conn->stop(); + } +}; + + static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment @@ -110,7 +121,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c) : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1), - lock("AsyncConnection::lock"), open_write(false), + lock("AsyncConnection::lock"), open_write(false), keepalive(false), got_bad_auth(false), authorizer(NULL), state_buffer(4096), state_offset(0), net(cct), center(c) { @@ -1595,7 +1606,8 @@ void AsyncConnection::_connect() state = STATE_CONNECTING; // rescheduler connection in order to avoid lock dep - center->create_time_event(0, read_handler); + // may called by external thread(send_message) + center->dispatch_event_external(read_handler); } void AsyncConnection::accept(int incoming) @@ -1622,8 +1634,7 @@ int AsyncConnection::send_message(Message *m) if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) { _connect(); } else if (sd > 0 && !open_write) { - center->create_file_event(sd, EVENT_WRITABLE, write_handler); - open_write = true; + center->dispatch_event_external(write_handler); } return 0; } @@ -1770,6 +1781,12 @@ void AsyncConnection::was_session_reset() connect_seq = 0; } +void AsyncConnection::mark_down() +{ + Mutex::Locker l(lock); + center->dispatch_event_external(EventCallbackRef(new C_handle_stop(this))); +} + // Who call "_stop(): // 1. receive STATE_OPEN_TAG_CLOSE // 2. fault when policy.lossy @@ -1915,6 +1932,13 @@ void AsyncConnection::handle_ack(uint64_t seq) } } +void AsyncConnection::send_keepalive() +{ + Mutex::Locker l(lock); + keepalive = true; + center->dispatch_event_external(write_handler); +} + void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { assert(lock.is_locked()); @@ -1948,6 +1972,11 @@ void AsyncConnection::handle_write() bufferlist bl; int r; if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) { + if (keepalive) { + _send_keepalive_or_ack(); + keepalive = false; + } + if (in_seq > in_seq_acked) { ceph_le64 s; s = in_seq; diff --git a/src/msg/AsyncConnection.h b/src/msg/AsyncConnection.h index 1fbb953ae9f4..3b352d8ceba9 100644 --- a/src/msg/AsyncConnection.h +++ b/src/msg/AsyncConnection.h @@ -85,7 +85,6 @@ class AsyncConnection : public Connection { } return m; } - public: AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c); ~AsyncConnection(); @@ -108,23 +107,13 @@ class AsyncConnection : public Connection { void accept(int sd); int send_message(Message *m); - void send_keepalive() { - Mutex::Locker l(lock); - if (state == STATE_OPEN) - _send_keepalive_or_ack(); - } - void mark_down() { - Mutex::Locker l(lock); - _stop(); - } + void send_keepalive(); + void mark_down(); void mark_disposable() { Mutex::Locker l(lock); policy.lossy = true; } - void handle_write(); - void process(); - private: enum { STATE_NONE, @@ -220,6 +209,7 @@ class AsyncConnection : public Connection { EventCallbackRef write_handler; EventCallbackRef reset_handler; EventCallbackRef remote_reset_handler; + bool keepalive; // Tis section are temp variables used by state transition @@ -248,6 +238,15 @@ class AsyncConnection : public Connection { NetHandler net; EventCenter *center; ceph::shared_ptr session_security; + + public: + // used by eventcallback + void handle_write(); + void process(); + void stop() { + Mutex::Locker l(lock); + _stop(); + } }; /* AsyncConnection */ typedef boost::intrusive_ptr AsyncConnectionRef; diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc index ae024298352d..4005f76220e5 100644 --- a/src/msg/AsyncMessenger.cc +++ b/src/msg/AsyncMessenger.cc @@ -22,7 +22,7 @@ static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { } static ostream& _prefix(std::ostream *_dout, Processor *p) { - return *_dout << "-- Processor"; + return *_dout << " Processor -- "; } static ostream& _prefix(std::ostream *_dout, Worker *w) { @@ -191,7 +191,8 @@ int Processor::start() ldout(msgr->cct, 1) << __func__ << " start" << dendl; // start thread - create(); + if (listen_sd > 0) + create(); return 0; } @@ -466,6 +467,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int // create connection Worker *w = workers[conn_id % workers.size()]; AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); + conn->connect(addr, type); w->center.dispatch_event_external(EventCallbackRef(new C_handle_connect(conn, addr, type))); assert(!conns.count(addr)); conns[addr] = conn;