]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Don't delete event if fd < 0
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 4 Feb 2015 18:06:55 +0000 (02:06 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 5 Feb 2015 02:37:42 +0000 (10:37 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Event.cc
src/test/msgr/test_async_driver.cc

index 9af029cd40220a60e410bdef9ebb20602517ed78..61dccf127fc9c5669555bca241907378302cdd45 100644 (file)
@@ -1726,7 +1726,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     // Now existing connection will be alive and the current connection will
     // exchange socket with existing connection because we want to maintain
     // original "connection_state"
-    existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
+    if (existing->sd > 0)
+      existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
     existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
 
index cc6430741c1a4fba4ec4853b2e09a708910c50dc..44b8cebfdac919deb08ee03410fea26f58765e5c 100644 (file)
@@ -329,7 +329,7 @@ void *Worker::entry()
   while (!done) {
     ldout(cct, 20) << __func__ << " calling event process" << dendl;
 
-    int r = center.process_events(30000000);
+    int r = center.process_events(EventMaxWaitUs);
     if (r < 0) {
       ldout(cct, 20) << __func__ << " process events failed: "
           << cpp_strerror(errno) << dendl;
index e0061aa0914201befac4d694562da8df8e9e8ef5..685799f64bd4b1331fbecb0f53afaf941c175907 100644 (file)
@@ -42,6 +42,8 @@ class AsyncMessenger;
 class WorkerPool;
 
 class Worker : public Thread {
+  static const uint64_t InitEventNumber = 5000;
+  static const uint64_t EventMaxWaitUs = 30000000;
   CephContext *cct;
   WorkerPool *pool;
   bool done;
@@ -51,7 +53,7 @@ class Worker : public Thread {
   EventCenter center;
   Worker(CephContext *c, WorkerPool *p, int i)
     : cct(c), pool(p), done(false), id(i), center(c) {
-    center.init(5000);
+    center.init(InitEventNumber);
   }
   void *entry();
   void stop();
index 99850cfe317b64fb3578ecbd542fb4941ec1ab49..bbb2457fb76fd07db44374713f8a6b7cc5bfb153 100644 (file)
@@ -111,7 +111,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 {
   int r = 0;
   Mutex::Locker l(file_lock);
-  if (fd > nevent) {
+  if (fd >= nevent) {
     int new_size = nevent << 2;
     while (fd > new_size)
       new_size <<= 2;
@@ -155,6 +155,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 
 void EventCenter::delete_file_event(int fd, int mask)
 {
+  assert(fd > 0);
   Mutex::Locker l(file_lock);
   if (fd > nevent) {
     ldout(cct, 1) << __func__ << " delete event fd=" << fd << " exceed nevent=" << nevent
index 6424e58192480405d309cbcb761e8ecf4c40e76e..369085109d909330223564db6ce61f1b8861ff85 100644 (file)
@@ -239,6 +239,27 @@ TEST_P(EventDriverTest, NetworkSocketTest) {
   ::close(listen_sd);
 }
 
+class FakeEvent : public EventCallback {
+
+ public:
+  void do_request(int fd_or_id) {}
+};
+
+TEST(EventCenterTest, FileEventExpansion) {
+  vector<int> sds;
+  EventCenter center;
+  center.init(100);
+  EventCallbackRef e(new FakeEvent());
+  for (int i = 0; i < 10000; i++) {
+    int s = ::socket(AF_INET, SOCK_STREAM, 0);
+    center.create_file_event(i, EVENT_READABLE, e);
+    sds.push_back(::socket(AF_INET, SOCK_STREAM, 0));
+  }
+
+  for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
+    center.delete_file_event(*it, EVENT_READABLE);
+}
+
 INSTANTIATE_TEST_CASE_P(
   AsyncMessenger,
   EventDriverTest,