]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/msg/async/EventKqueue: refactor and add test_thread_change() 11430/head
authorWillem Jan Withagen <wjw@digiware.nl>
Wed, 12 Oct 2016 10:27:42 +0000 (12:27 +0200)
committerWillem Jan Withagen <wjw@digiware.nl>
Sat, 15 Oct 2016 11:09:51 +0000 (13:09 +0200)
Signed-off-by: Willem Jan Withagen <wjw@digiware.nl>
src/msg/async/EventKqueue.cc
src/msg/async/EventKqueue.h

index 076ceb547a86469bd265b4e8ebf69a469f1623ec..cbd12449d0204ce9164f84cce32fea302c20f0e9 100644 (file)
@@ -26,7 +26,7 @@
 
 int KqueueDriver::test_kqfd() {
   struct kevent ke[1];
-  if (kevent(kqfd, ke, 0, NULL, 0, NULL) == -1) {
+  if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) {
     ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd 
                  << cpp_strerror(errno) << dendl;
     return -errno;
@@ -37,21 +37,20 @@ int KqueueDriver::test_kqfd() {
 int KqueueDriver::restore_events() {
   struct kevent ke[2];
   int i;
-  int num = 0;
 
   ldout(cct,10) << __func__ << " on kqfd = " << kqfd << dendl;
   for(i=0;i<size;i++) {
+    int num = 0;
     if (sav_events[i].mask == 0 )
       continue;
     ldout(cct,10) << __func__ << " restore kqfd = " << kqfd 
-       << " fd = " << i << " mask " << sav_events[i].mask
-       << dendl;
+                  << " fd = " << i << " mask " << sav_events[i].mask << dendl;
     if (sav_events[i].mask & EVENT_READABLE)
       EV_SET(&ke[num++], i, EVFILT_READ, EV_ADD, 0, 0, NULL);
     if (sav_events[i].mask & EVENT_WRITABLE)
       EV_SET(&ke[num++], i, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
     if (num) {
-      if (kevent(kqfd, ke, num, NULL, 0, NULL) == -1) {
+      if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
         ldout(cct,0) << __func__ << " unable to add event: "
                      << cpp_strerror(errno) << dendl;
         return -errno;
@@ -61,11 +60,47 @@ int KqueueDriver::restore_events() {
   return 0;
 }
 
+int KqueueDriver::test_thread_change(const char* funcname) {
+  // check to see if we changed thread, because that invalidates
+  // the kqfd and we need to restore that
+  int oldkqfd = kqfd;
+
+  if (!pthread_equal(mythread, pthread_self())) {
+    ldout(cct,10) << funcname << " We changed thread from " << mythread
+                  << " to " << pthread_self() << dendl;
+    mythread = pthread_self();
+    kqfd = -1;
+  } else if ((kqfd != -1) && (test_kqfd() < 0)) {
+    // should this ever happen?
+    // It would be strange to change kqfd with thread change.
+    // Might nee to change this into an assert() in the future.
+    ldout(cct,0) << funcname << " Warning: Recreating old kqfd. "
+                 << "This should not happen!!!"  << dendl;
+    kqfd = -1;
+  }
+  if (kqfd == -1) {
+    kqfd = kqueue();
+    ldout(cct,10) << funcname << " kqueue: new kqfd = " << kqfd
+                  << " (was: " << oldkqfd << ")"
+                  << dendl;
+    if (kqfd < 0) {
+      lderr(cct) << funcname << " unable to do kqueue: "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+    if (restore_events()< 0) {
+      lderr(cct) << funcname << " unable restore all events "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+  }
+  return 0;
+}
+
 int KqueueDriver::init(int nevent)
 {
   // keep track of possible changes of our thread
   // because change of thread kills the kqfd
-  int oldkqfd = kqfd;  
   mythread = pthread_self();
 
   // Reserve the space to accept the kevent return events.
@@ -76,6 +111,7 @@ int KqueueDriver::init(int nevent)
     return -ENOMEM;
   }
   memset(res_events, 0, sizeof(struct kevent)*nevent);
+  size = nevent;
 
   // Reserve the space to keep all of the events set, so it can be redone
   // when we change trhread ID. 
@@ -86,67 +122,42 @@ int KqueueDriver::init(int nevent)
     return -ENOMEM;
   }
   memset(sav_events, 0, sizeof(struct SaveEvent)*nevent);
+  sav_max = nevent;
 
   // Delay assigning a descriptor until it is really needed.
   // kqfd = kqueue();
   kqfd = -1;
-  
-  size = nevent;
-
   return 0;
 }
 
 int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
 {
   struct kevent ke[2];
-  int events = 0;
-  int filter = 0;
   int num = 0;
-  int oldkqfd = kqfd;
-
-  if (!pthread_equal(mythread, pthread_self())) {
-    ldout(cct,10) << __func__ << " We changed thread from " << mythread 
-                 << " to " << pthread_self() << dendl;
-    mythread = pthread_self();
-    kqfd = -1;
-  } else if ((kqfd != -1) && (test_kqfd() < 0)) {
-    ldout(cct,10) << __func__ << " Recreating old kqfd."  << dendl;
-    kqfd = -1;
-  }
-  if (kqfd == -1) {
-    kqfd = kqueue();
-    ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd 
-                 << " (was: " << oldkqfd << ")" 
-                 << dendl;
-    if (kqfd < 0) {
-      lderr(cct) << __func__ << " unable to do kqueue: "
-                             << cpp_strerror(errno) << dendl;
-      return -errno;
-    }
-    if (restore_events()< 0) {
-      lderr(cct) << __func__ << " unable restore all events "
-                             << cpp_strerror(errno) << dendl;
-      return -errno;
-    }
-  }
 
   ldout(cct,20) << __func__ << " add event kqfd = " << kqfd << " fd = " << fd 
        << " cur_mask = " << cur_mask << " add_mask = " << add_mask 
        << dendl;
 
+  int r = test_thread_change(__func__);
+  if ( r < 0 )
+    return r;
+
   if (add_mask & EVENT_READABLE)
     EV_SET(&ke[num++], fd, EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, NULL);
   if (add_mask & EVENT_WRITABLE)
     EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_ADD|EV_CLEAR, 0, 0, NULL);
 
   if (num) {
-    if (kevent(kqfd, ke, num, NULL, 0, NULL) == -1) {
+    if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
       lderr(cct) << __func__ << " unable to add event: "
                              << cpp_strerror(errno) << dendl;
       return -errno;
     }
   }
   // keep what we set
+  if (fd >= sav_max)
+    resize_events(sav_max+5000);
   sav_events[fd].mask = cur_mask | add_mask;
   return 0;
 }
@@ -161,28 +172,9 @@ int KqueueDriver::del_event(int fd, int cur_mask, int del_mask)
        << " fd = " << fd << " cur_mask = " << cur_mask 
        << " del_mask = " << del_mask << dendl;
 
-  if (!pthread_equal(mythread, pthread_self())) {
-    ldout(cct,10) << __func__ << " We changed thread from " << mythread 
-                 << " to " << pthread_self() << dendl;
-    mythread = pthread_self();
-  } else if ((kqfd != -1) && (test_kqfd() < 0)) {
-    ldout(cct,10) << __func__ << " Recreating old kqfd."  << dendl;
-    kqfd = -1;
-  } 
-  if (kqfd == -1) {
-    kqfd = kqueue();
-    ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd << dendl;
-    if (kqfd < 0) {
-      lderr(cct) << __func__ << " unable to do kqueue: "
-                             << cpp_strerror(errno) << dendl;
-      return -errno;
-    }
-    if (restore_events()< 0) {
-      lderr(cct) << __func__ << " unable restore all events "
-                             << cpp_strerror(errno) << dendl;
-      return -errno;
-    }
-  }
+  int r = test_thread_change(__func__);
+  if ( r < 0 )
+    return r;
 
   if (mask & EVENT_READABLE)
     EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
@@ -191,7 +183,7 @@ int KqueueDriver::del_event(int fd, int cur_mask, int del_mask)
 
   if (num) {
     int r = 0;
-    if ((r = kevent(kqfd, ke, num, NULL, 0, NULL)) < 0) {
+    if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) {
       lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask
                  << " failed." << cpp_strerror(errno) << dendl;
       return -errno;
@@ -204,6 +196,19 @@ int KqueueDriver::del_event(int fd, int cur_mask, int del_mask)
 
 int KqueueDriver::resize_events(int newsize)
 {
+  ldout(cct,10) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize 
+                << dendl;
+  if(newsize > sav_max) {
+    sav_events = (struct SaveEvent*)realloc( sav_events, 
+                    sizeof(struct SaveEvent)*newsize);
+    if (!sav_events) {
+      lderr(cct) << __func__ << " unable to realloc memory: "
+                             << cpp_strerror(errno) << dendl;
+      return -ENOMEM;
+    }
+    memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max));
+    sav_max = newsize;
+  }
   return 0;
 }
 
@@ -213,29 +218,10 @@ int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeva
   struct timespec timeout;
 
   ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl;
-  if (!pthread_equal(mythread, pthread_self())) {
-    ldout(cct,10) << __func__ << " We changed thread from " << mythread 
-                 << " to " << pthread_self() << dendl;
-    mythread = pthread_self();
-    kqfd = -1;
-  } else if ((kqfd != -1) && (test_kqfd() < 0)) {
-    ldout(cct,10) << __func__ << " Recreating old kqfd."  << dendl;
-    kqfd = -1;
-  } 
-  if (kqfd == -1) {
-    kqfd = kqueue();
-    ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd << dendl;
-    if (kqfd < 0) {
-      lderr(cct) << __func__ << " unable to do kqueue: "
-                             << cpp_strerror(errno) << dendl;
-      return -errno;
-    }
-    if (restore_events()< 0) {
-      lderr(cct) << __func__ << " unable restore all events "
-                             << cpp_strerror(errno) << dendl;
-      return -errno;
-    }
-  }
+
+  int r = test_thread_change(__func__);
+  if ( r < 0 )
+    return r;
 
   if (tvp != NULL) {
       timeout.tv_sec = tvp->tv_sec;
@@ -246,15 +232,11 @@ int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeva
                << dendl;
       retval = kevent(kqfd, NULL, 0, res_events, size, &timeout);
   } else {
-      ldout(cct,20) << __func__ << " event_wait: "
-               << " NULL"
-               << dendl;
-      retval = kevent(kqfd, NULL, 0, res_events, size, NULL);
+      ldout(cct,20) << __func__ << " event_wait: " << " NULL" << dendl;
+      retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT);
   }
 
-  ldout(cct,25) << __func__ << " kevent retval: "
-               << retval
-               << dendl;
+  ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl;
   if (retval < 0) {
     lderr(cct) << __func__ << " kqueue error: "
                            << cpp_strerror(errno) << dendl;
@@ -272,7 +254,6 @@ int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeva
     for (j = 0; j < numevents; j++) {
       int mask = 0;
       struct kevent *e = res_events + j;
-      ldout(cct,20) << __func__ << " j: " << j << dendl;
 
       if (e->filter == EVFILT_READ) mask |= EVENT_READABLE;
       if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE;
index 2f14b5f28e94933892ce5ac7833a9ce1a060857c..5183e438193b55c36575d59b2edce95f40e7d0ec 100644 (file)
@@ -36,21 +36,24 @@ class KqueueDriver : public EventDriver {
     int mask;
   };
   struct SaveEvent *sav_events;
-  int sav_index;
+  int sav_max;
   int restore_events();
   int test_kqfd();
+  int test_thread_change(const char* funcname);
 
  public:
-  explicit KqueueDriver(CephContext *c): kqfd(-1), res_events(NULL), cct(c), size(0) {}
+  explicit KqueueDriver(CephContext *c): kqfd(-1), res_events(NULL), cct(c), 
+               size(0), sav_max(0) {}
   virtual ~KqueueDriver() {
     if (kqfd != -1)
       close(kqfd);
 
     if (res_events)
       free(res_events);
+    size = 0;
     if (sav_events)
       free(sav_events);
-    sav_index = 0;
+    sav_max = 0;
   }
 
   int init(int nevent) override;