From: Haomai Wang Date: Wed, 4 Feb 2015 18:06:55 +0000 (+0800) Subject: AsyncConnection: Don't delete event if fd < 0 X-Git-Tag: v0.93~80^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2b6b100a9b61e908c745c70c6d0ccb196a4e1bd7;p=ceph.git AsyncConnection: Don't delete event if fd < 0 Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 9af029cd4022..61dccf127fc9 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1726,7 +1726,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a // Now existing connection will be alive and the current connection will // exchange socket with existing connection because we want to maintain // original "connection_state" - existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE); + if (existing->sd > 0) + existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE); center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler); diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index cc6430741c1a..44b8cebfdac9 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -329,7 +329,7 @@ void *Worker::entry() while (!done) { ldout(cct, 20) << __func__ << " calling event process" << dendl; - int r = center.process_events(30000000); + int r = center.process_events(EventMaxWaitUs); if (r < 0) { ldout(cct, 20) << __func__ << " process events failed: " << cpp_strerror(errno) << dendl; diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index e0061aa09142..685799f64bd4 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -42,6 +42,8 @@ class AsyncMessenger; class WorkerPool; class Worker : public Thread { + static const uint64_t InitEventNumber = 5000; + static const uint64_t EventMaxWaitUs = 30000000; CephContext *cct; WorkerPool *pool; bool done; @@ -51,7 +53,7 @@ class Worker : public Thread { EventCenter center; Worker(CephContext *c, WorkerPool *p, int i) : cct(c), pool(p), done(false), id(i), center(c) { - center.init(5000); + center.init(InitEventNumber); } void *entry(); void stop(); diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 99850cfe317b..bbb2457fb76f 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -111,7 +111,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) { int r = 0; Mutex::Locker l(file_lock); - if (fd > nevent) { + if (fd >= nevent) { int new_size = nevent << 2; while (fd > new_size) new_size <<= 2; @@ -155,6 +155,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) void EventCenter::delete_file_event(int fd, int mask) { + assert(fd > 0); Mutex::Locker l(file_lock); if (fd > nevent) { ldout(cct, 1) << __func__ << " delete event fd=" << fd << " exceed nevent=" << nevent diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index 6424e5819248..369085109d90 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -239,6 +239,27 @@ TEST_P(EventDriverTest, NetworkSocketTest) { ::close(listen_sd); } +class FakeEvent : public EventCallback { + + public: + void do_request(int fd_or_id) {} +}; + +TEST(EventCenterTest, FileEventExpansion) { + vector sds; + EventCenter center; + center.init(100); + EventCallbackRef e(new FakeEvent()); + for (int i = 0; i < 10000; i++) { + int s = ::socket(AF_INET, SOCK_STREAM, 0); + center.create_file_event(i, EVENT_READABLE, e); + sds.push_back(::socket(AF_INET, SOCK_STREAM, 0)); + } + + for (vector::iterator it = sds.begin(); it != sds.end(); ++it) + center.delete_file_event(*it, EVENT_READABLE); +} + INSTANTIATE_TEST_CASE_P( AsyncMessenger, EventDriverTest,