]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/msg/async/EventKqueue.{h,cc} Added code to restore events on (thread)fork
authorWillem Jan Withagen <wjw@digiware.nl>
Tue, 11 Oct 2016 15:28:23 +0000 (17:28 +0200)
committerWillem Jan Withagen <wjw@digiware.nl>
Sat, 15 Oct 2016 11:04:36 +0000 (13:04 +0200)
According the FreeBSD man page of kqueue(), the kq-descriptors become invalid
upon fork. It looks like the same happens when a kq-handle is created and then
a thread is created.

So we keep a list of assigned events with each kq-handle and when the handle
has beccome invalid, we recreated the kq-handle and the events that go with it.

Signed-off-by: Willem Jan Withagen <wjw@digiware.nl>
src/msg/async/EventKqueue.cc
src/msg/async/EventKqueue.h

index d013c059c85d80fb73a20a103c734f614e1a62bf..d4fa5519021331651d3a5927d07c566d2ff7f7d9 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "KqueueDriver."
 
+#define KEVENT_NOWAIT 0
+
+int KqueueDriver::test_kqfd() {
+  struct kevent ke[1];
+  // EV_SET(&ke[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+  if (kevent(kqfd, ke, 0, NULL, 0, NULL) == -1) {
+    ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd 
+                 << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  return kqfd;
+}
+
+int KqueueDriver::restore_events() {
+  struct kevent ke[2];
+  int i;
+  int num = 0;
+
+  ldout(cct,10) << __func__ << " on kqfd = " << kqfd << dendl;
+  for(i=0;i<size;i++) {
+    if (sav_events[i].mask == 0 )
+      continue;
+    ldout(cct,10) << __func__ << " restore kqfd = " << kqfd 
+       << " fd = " << i << " mask " << sav_events[i].mask
+       << dendl;
+    if (sav_events[i].mask & EVENT_READABLE)
+      EV_SET(&ke[num++], i, EVFILT_READ, EV_ADD, 0, 0, NULL);
+    if (sav_events[i].mask & EVENT_WRITABLE)
+      EV_SET(&ke[num++], i, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+    if (num) {
+      if (kevent(kqfd, ke, num, NULL, 0, NULL) == -1) {
+        ldout(cct,0) << __func__ << " unable to add event: "
+                     << cpp_strerror(errno) << dendl;
+        return -errno;
+      }
+    }
+  }
+  return 0;
+}
+
 int KqueueDriver::init(int nevent)
 {
-  events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
-  if (!events) {
+  // keep track of possible changes of our thread
+  // because change of thread kills the kqfd
+  int oldkqfd = kqfd;  
+  mythread = pthread_self();
+
+  // Reserve the space to accept the kevent return events.
+  res_events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
+  if (!res_events) {
     lderr(cct) << __func__ << " unable to malloc memory: "
                            << cpp_strerror(errno) << dendl;
     return -ENOMEM;
   }
-  memset(events, 0, sizeof(struct kevent)*nevent);
+  memset(res_events, 0, sizeof(struct kevent)*nevent);
 
-  kqfd = kqueue();
-  if (kqfd < 0) {
-    lderr(cct) << __func__ << " unable to do kqueue: "
+  // Reserve the space to keep all of the events set, so it can be redone
+  // when we change trhread ID. 
+  sav_events = (struct SaveEvent*)malloc(sizeof(struct SaveEvent)*nevent);
+  if (!sav_events) {
+    lderr(cct) << __func__ << " unable to malloc memory: "
                            << cpp_strerror(errno) << dendl;
-    return -errno;
+    return -ENOMEM;
   }
+  memset(sav_events, 0, sizeof(struct SaveEvent)*nevent);
 
+  // Delay assigning a descriptor until it is really needed.
+  // kqfd = kqueue();
+  kqfd = -1;
+  
   size = nevent;
 
   return 0;
@@ -46,10 +99,41 @@ int KqueueDriver::init(int nevent)
 
 int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
 {
-  ldout(cct, 20) << __func__ << " add event fd=" << fd << " cur_mask=" << cur_mask
-                 << "add_mask" << add_mask << dendl;
   struct kevent ke[2];
+  int events = 0;
+  int filter = 0;
   int num = 0;
+  int oldkqfd = kqfd;
+
+  if (!pthread_equal(mythread, pthread_self())) {
+    ldout(cct,10) << __func__ << " We changed thread from " << mythread << " to " << pthread_self() << dendl;
+    mythread = pthread_self();
+    kqfd = -1;
+  } else if ((kqfd != -1) && (test_kqfd() < 0)) {
+    ldout(cct,10) << __func__ << " Recreating old kqfd."  << dendl;
+    kqfd = -1;
+  }
+  if (kqfd == -1) {
+    kqfd = kqueue();
+    ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd 
+                 << " (was: " << oldkqfd << ")" 
+                 << dendl;
+    if (kqfd < 0) {
+      lderr(cct) << __func__ << " unable to do kqueue: "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+    if (restore_events()< 0) {
+      lderr(cct) << __func__ << " unable restore all events "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+  }
+
+  ldout(cct,10) << __func__ << " add event kqfd = " << kqfd << " fd = " << fd 
+       << " cur_mask = " << cur_mask << " add_mask = " << add_mask 
+       << dendl;
+
   if (add_mask & EVENT_READABLE)
     EV_SET(&ke[num++], fd, EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, NULL);
   if (add_mask & EVENT_WRITABLE)
@@ -62,17 +146,43 @@ int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
       return -errno;
     }
   }
-
+  // keep what we set
+  sav_events[fd].mask = cur_mask | add_mask;
   return 0;
 }
 
-int KqueueDriver::del_event(int fd, int cur_mask, int delmask)
+int KqueueDriver::del_event(int fd, int cur_mask, int del_mask)
 {
-  ldout(cct, 20) << __func__ << " del event fd=" << fd << " cur mask=" << cur_mask
-                 << " delmask=" << delmask << dendl;
   struct kevent ke[2];
   int num = 0;
-  int mask = cur_mask & delmask;
+  int mask = cur_mask & del_mask;
+
+  ldout(cct,10) << __func__ << " delete event kqfd = " << kqfd 
+       << " fd = " << fd << " cur_mask = " << cur_mask 
+       << " del_mask = " << del_mask << dendl;
+
+  if (!pthread_equal(mythread, pthread_self())) {
+    ldout(cct,10) << __func__ << " We changed thread from " << mythread << " to " << pthread_self() << dendl;
+    mythread = pthread_self();
+  } else if ((kqfd != -1) && (test_kqfd() < 0)) {
+    ldout(cct,10) << __func__ << " Recreating old kqfd."  << dendl;
+    kqfd = -1;
+  } 
+  if (kqfd == -1) {
+    kqfd = kqueue();
+    ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd << dendl;
+    if (kqfd < 0) {
+      lderr(cct) << __func__ << " unable to do kqueue: "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+    if (restore_events()< 0) {
+      lderr(cct) << __func__ << " unable restore all events "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+  }
+
   if (mask & EVENT_READABLE)
     EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
   if (mask & EVENT_WRITABLE)
@@ -86,6 +196,8 @@ int KqueueDriver::del_event(int fd, int cur_mask, int delmask)
       return -errno;
     }
   }
+  // keep the administration
+  sav_events[fd].mask = cur_mask & ~del_mask;
   return 0;
 }
 
@@ -99,22 +211,66 @@ int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeva
   int retval, numevents = 0;
   struct timespec timeout;
 
+  ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl;
+  if (!pthread_equal(mythread, pthread_self())) {
+    ldout(cct,10) << __func__ << " We changed thread from " << mythread << " to " << pthread_self() << dendl;
+    mythread = pthread_self();
+    kqfd = -1;
+  } else if ((kqfd != -1) && (test_kqfd() < 0)) {
+    ldout(cct,10) << __func__ << " Recreating old kqfd."  << dendl;
+    kqfd = -1;
+  } 
+  if (kqfd == -1) {
+    kqfd = kqueue();
+    ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd << dendl;
+    if (kqfd < 0) {
+      lderr(cct) << __func__ << " unable to do kqueue: "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+    if (restore_events()< 0) {
+      lderr(cct) << __func__ << " unable restore all events "
+                             << cpp_strerror(errno) << dendl;
+      return -errno;
+    }
+  }
+
   if (tvp != NULL) {
       timeout.tv_sec = tvp->tv_sec;
       timeout.tv_nsec = tvp->tv_usec * 1000;
-      retval = kevent(kqfd, NULL, 0, events, size, &timeout);
+      ldout(cct,10) << __func__ << " "
+               << timeout.tv_sec << " sec "
+               << timeout.tv_nsec << " nsec"
+               << dendl;
+      retval = kevent(kqfd, NULL, 0, res_events, size, &timeout);
   } else {
-      retval = kevent(kqfd, NULL, 0, events, size, NULL);
+      ldout(cct,10) << __func__ << " event_wait: "
+               << " NULL"
+               << dendl;
+      retval = kevent(kqfd, NULL, 0, res_events, size, NULL);
   }
 
-  if (retval > 0) {
+  ldout(cct,10) << __func__ << " kevent retval: "
+               << retval
+               << dendl;
+  if (retval < 0) {
+    lderr(cct) << __func__ << " kqueue error: "
+                           << cpp_strerror(errno) << dendl;
+    return -errno;
+  } else if (retval == 0) {
+    ldout(cct,5) << __func__ << " Hit timeout("
+                 << timeout.tv_sec << " sec "
+                 << timeout.tv_nsec << " nsec"
+                << ")." << dendl;
+  } else {
     int j;
 
     numevents = retval;
     fired_events.resize(numevents);
     for (j = 0; j < numevents; j++) {
       int mask = 0;
-      struct kevent *e = events + j;
+      struct kevent *e = res_events + j;
+      ldout(cct,10) << __func__ << " j: " << j << dendl;
 
       if (e->filter == EVFILT_READ) mask |= EVENT_READABLE;
       if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE;
index bb6e023430fd063e01c0d9ffa5e938f19444c9b0..2f14b5f28e94933892ce5ac7833a9ce1a060857c 100644 (file)
 
 class KqueueDriver : public EventDriver {
   int kqfd;
-  struct kevent *events;
+  pthread_t mythread;
+  struct kevent *res_events;
   CephContext *cct;
   int size;
 
+  // Keep what we set on the kqfd
+  struct SaveEvent{
+    int fd;
+    int mask;
+  };
+  struct SaveEvent *sav_events;
+  int sav_index;
+  int restore_events();
+  int test_kqfd();
+
  public:
-  explicit KqueueDriver(CephContext *c): kqfd(-1), events(NULL), cct(c), size(0) {}
+  explicit KqueueDriver(CephContext *c): kqfd(-1), res_events(NULL), cct(c), size(0) {}
   virtual ~KqueueDriver() {
     if (kqfd != -1)
       close(kqfd);
 
-    if (events)
-      free(events);
+    if (res_events)
+      free(res_events);
+    if (sav_events)
+      free(sav_events);
+    sav_index = 0;
   }
 
   int init(int nevent) override;