From: Willem Jan Withagen Date: Wed, 12 Oct 2016 10:27:42 +0000 (+0200) Subject: src/msg/async/EventKqueue: refactor and add test_thread_change() X-Git-Tag: v11.1.0~630^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F11430%2Fhead;p=ceph.git src/msg/async/EventKqueue: refactor and add test_thread_change() Signed-off-by: Willem Jan Withagen --- diff --git a/src/msg/async/EventKqueue.cc b/src/msg/async/EventKqueue.cc index 076ceb547a864..cbd12449d0204 100644 --- a/src/msg/async/EventKqueue.cc +++ b/src/msg/async/EventKqueue.cc @@ -26,7 +26,7 @@ int KqueueDriver::test_kqfd() { struct kevent ke[1]; - if (kevent(kqfd, ke, 0, NULL, 0, NULL) == -1) { + if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) { ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd << cpp_strerror(errno) << dendl; return -errno; @@ -37,21 +37,20 @@ int KqueueDriver::test_kqfd() { int KqueueDriver::restore_events() { struct kevent ke[2]; int i; - int num = 0; ldout(cct,10) << __func__ << " on kqfd = " << kqfd << dendl; for(i=0;i= sav_max) + resize_events(sav_max+5000); sav_events[fd].mask = cur_mask | add_mask; return 0; } @@ -161,28 +172,9 @@ int KqueueDriver::del_event(int fd, int cur_mask, int del_mask) << " fd = " << fd << " cur_mask = " << cur_mask << " del_mask = " << del_mask << dendl; - if (!pthread_equal(mythread, pthread_self())) { - ldout(cct,10) << __func__ << " We changed thread from " << mythread - << " to " << pthread_self() << dendl; - mythread = pthread_self(); - } else if ((kqfd != -1) && (test_kqfd() < 0)) { - ldout(cct,10) << __func__ << " Recreating old kqfd." << dendl; - kqfd = -1; - } - if (kqfd == -1) { - kqfd = kqueue(); - ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd << dendl; - if (kqfd < 0) { - lderr(cct) << __func__ << " unable to do kqueue: " - << cpp_strerror(errno) << dendl; - return -errno; - } - if (restore_events()< 0) { - lderr(cct) << __func__ << " unable restore all events " - << cpp_strerror(errno) << dendl; - return -errno; - } - } + int r = test_thread_change(__func__); + if ( r < 0 ) + return r; if (mask & EVENT_READABLE) EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); @@ -191,7 +183,7 @@ int KqueueDriver::del_event(int fd, int cur_mask, int del_mask) if (num) { int r = 0; - if ((r = kevent(kqfd, ke, num, NULL, 0, NULL)) < 0) { + if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) { lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask << " failed." << cpp_strerror(errno) << dendl; return -errno; @@ -204,6 +196,19 @@ int KqueueDriver::del_event(int fd, int cur_mask, int del_mask) int KqueueDriver::resize_events(int newsize) { + ldout(cct,10) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize + << dendl; + if(newsize > sav_max) { + sav_events = (struct SaveEvent*)realloc( sav_events, + sizeof(struct SaveEvent)*newsize); + if (!sav_events) { + lderr(cct) << __func__ << " unable to realloc memory: " + << cpp_strerror(errno) << dendl; + return -ENOMEM; + } + memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max)); + sav_max = newsize; + } return 0; } @@ -213,29 +218,10 @@ int KqueueDriver::event_wait(vector &fired_events, struct timeva struct timespec timeout; ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl; - if (!pthread_equal(mythread, pthread_self())) { - ldout(cct,10) << __func__ << " We changed thread from " << mythread - << " to " << pthread_self() << dendl; - mythread = pthread_self(); - kqfd = -1; - } else if ((kqfd != -1) && (test_kqfd() < 0)) { - ldout(cct,10) << __func__ << " Recreating old kqfd." << dendl; - kqfd = -1; - } - if (kqfd == -1) { - kqfd = kqueue(); - ldout(cct,10) << __func__ << " kqueue: new kqfd = " << kqfd << dendl; - if (kqfd < 0) { - lderr(cct) << __func__ << " unable to do kqueue: " - << cpp_strerror(errno) << dendl; - return -errno; - } - if (restore_events()< 0) { - lderr(cct) << __func__ << " unable restore all events " - << cpp_strerror(errno) << dendl; - return -errno; - } - } + + int r = test_thread_change(__func__); + if ( r < 0 ) + return r; if (tvp != NULL) { timeout.tv_sec = tvp->tv_sec; @@ -246,15 +232,11 @@ int KqueueDriver::event_wait(vector &fired_events, struct timeva << dendl; retval = kevent(kqfd, NULL, 0, res_events, size, &timeout); } else { - ldout(cct,20) << __func__ << " event_wait: " - << " NULL" - << dendl; - retval = kevent(kqfd, NULL, 0, res_events, size, NULL); + ldout(cct,20) << __func__ << " event_wait: " << " NULL" << dendl; + retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT); } - ldout(cct,25) << __func__ << " kevent retval: " - << retval - << dendl; + ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl; if (retval < 0) { lderr(cct) << __func__ << " kqueue error: " << cpp_strerror(errno) << dendl; @@ -272,7 +254,6 @@ int KqueueDriver::event_wait(vector &fired_events, struct timeva for (j = 0; j < numevents; j++) { int mask = 0; struct kevent *e = res_events + j; - ldout(cct,20) << __func__ << " j: " << j << dendl; if (e->filter == EVFILT_READ) mask |= EVENT_READABLE; if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE; diff --git a/src/msg/async/EventKqueue.h b/src/msg/async/EventKqueue.h index 2f14b5f28e949..5183e438193b5 100644 --- a/src/msg/async/EventKqueue.h +++ b/src/msg/async/EventKqueue.h @@ -36,21 +36,24 @@ class KqueueDriver : public EventDriver { int mask; }; struct SaveEvent *sav_events; - int sav_index; + int sav_max; int restore_events(); int test_kqfd(); + int test_thread_change(const char* funcname); public: - explicit KqueueDriver(CephContext *c): kqfd(-1), res_events(NULL), cct(c), size(0) {} + explicit KqueueDriver(CephContext *c): kqfd(-1), res_events(NULL), cct(c), + size(0), sav_max(0) {} virtual ~KqueueDriver() { if (kqfd != -1) close(kqfd); if (res_events) free(res_events); + size = 0; if (sav_events) free(sav_events); - sav_index = 0; + sav_max = 0; } int init(int nevent) override;