From 4e3ed5ad04899a5f3eb9331cac8556b05ee41fc2 Mon Sep 17 00:00:00 2001 From: Willem Jan Withagen Date: Tue, 11 Oct 2016 17:28:23 +0200 Subject: [PATCH] src/msg/async/EventKqueue.{h,cc} Added code to restore events on (thread)fork According the FreeBSD man page of kqueue(), the kq-descriptors become invalid upon fork. It looks like the same happens when a kq-handle is created and then a thread is created. So we keep a list of assigned events with each kq-handle and when the handle has beccome invalid, we recreated the kq-handle and the events that go with it. Signed-off-by: Willem Jan Withagen --- src/msg/async/EventKqueue.cc | 192 +++++++++++++++++++++++++++++++---- src/msg/async/EventKqueue.h | 22 +++- 2 files changed, 192 insertions(+), 22 deletions(-) diff --git a/src/msg/async/EventKqueue.cc b/src/msg/async/EventKqueue.cc index d013c059c85d8..d4fa551902133 100644 --- a/src/msg/async/EventKqueue.cc +++ b/src/msg/async/EventKqueue.cc @@ -22,23 +22,76 @@ #undef dout_prefix #define dout_prefix *_dout << "KqueueDriver." +#define KEVENT_NOWAIT 0 + +int KqueueDriver::test_kqfd() { + struct kevent ke[1]; + // EV_SET(&ke[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + if (kevent(kqfd, ke, 0, NULL, 0, NULL) == -1) { + ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd + << cpp_strerror(errno) << dendl; + return -errno; + } + return kqfd; +} + +int KqueueDriver::restore_events() { + struct kevent ke[2]; + int i; + int num = 0; + + ldout(cct,10) << __func__ << " on kqfd = " << kqfd << dendl; + for(i=0;i &fired_events, struct timeva int retval, numevents = 0; struct timespec timeout; + ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl; + if (!pthread_equal(mythread, pthread_self())) { + ldout(cct,10) << __func__ << " We changed thread from " << mythread << " to " << pthread_self() << dendl; + mythread = pthread_self(); + kqfd = -1; + } else if ((kqfd != -1) && (test_kqfd() < 0)) { + ldout(cct,10) << __func__ << " Recreating old kqfd." << dendl; + kqfd = -1; + } + if (kqfd == -1) { + kqfd = kqueue(); + ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd << dendl; + if (kqfd < 0) { + lderr(cct) << __func__ << " unable to do kqueue: " + << cpp_strerror(errno) << dendl; + return -errno; + } + if (restore_events()< 0) { + lderr(cct) << __func__ << " unable restore all events " + << cpp_strerror(errno) << dendl; + return -errno; + } + } + if (tvp != NULL) { timeout.tv_sec = tvp->tv_sec; timeout.tv_nsec = tvp->tv_usec * 1000; - retval = kevent(kqfd, NULL, 0, events, size, &timeout); + ldout(cct,10) << __func__ << " " + << timeout.tv_sec << " sec " + << timeout.tv_nsec << " nsec" + << dendl; + retval = kevent(kqfd, NULL, 0, res_events, size, &timeout); } else { - retval = kevent(kqfd, NULL, 0, events, size, NULL); + ldout(cct,10) << __func__ << " event_wait: " + << " NULL" + << dendl; + retval = kevent(kqfd, NULL, 0, res_events, size, NULL); } - if (retval > 0) { + ldout(cct,10) << __func__ << " kevent retval: " + << retval + << dendl; + if (retval < 0) { + lderr(cct) << __func__ << " kqueue error: " + << cpp_strerror(errno) << dendl; + return -errno; + } else if (retval == 0) { + ldout(cct,5) << __func__ << " Hit timeout(" + << timeout.tv_sec << " sec " + << timeout.tv_nsec << " nsec" + << ")." << dendl; + } else { int j; numevents = retval; fired_events.resize(numevents); for (j = 0; j < numevents; j++) { int mask = 0; - struct kevent *e = events + j; + struct kevent *e = res_events + j; + ldout(cct,10) << __func__ << " j: " << j << dendl; if (e->filter == EVFILT_READ) mask |= EVENT_READABLE; if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE; diff --git a/src/msg/async/EventKqueue.h b/src/msg/async/EventKqueue.h index bb6e023430fd0..2f14b5f28e949 100644 --- a/src/msg/async/EventKqueue.h +++ b/src/msg/async/EventKqueue.h @@ -25,18 +25,32 @@ class KqueueDriver : public EventDriver { int kqfd; - struct kevent *events; + pthread_t mythread; + struct kevent *res_events; CephContext *cct; int size; + // Keep what we set on the kqfd + struct SaveEvent{ + int fd; + int mask; + }; + struct SaveEvent *sav_events; + int sav_index; + int restore_events(); + int test_kqfd(); + public: - explicit KqueueDriver(CephContext *c): kqfd(-1), events(NULL), cct(c), size(0) {} + explicit KqueueDriver(CephContext *c): kqfd(-1), res_events(NULL), cct(c), size(0) {} virtual ~KqueueDriver() { if (kqfd != -1) close(kqfd); - if (events) - free(events); + if (res_events) + free(res_events); + if (sav_events) + free(sav_events); + sav_index = 0; } int init(int nevent) override; -- 2.39.5