From ce6f22d698868366cb50e1cf2ae1c1e0716d3041 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 29 Oct 2014 16:48:07 +0800 Subject: [PATCH] AsyncMessenger: Add kqueue support AsyncMessenger will select event driver following epoll, kqueue and select(now not exists) sequence Fix #9926 Signed-off-by: Haomai Wang --- configure.ac | 2 + src/msg/Makefile.am | 29 ++++++-- src/msg/async/EventKqueue.cc | 124 +++++++++++++++++++++++++++++++++++ src/msg/async/EventKqueue.h | 48 ++++++++++++++ 4 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 src/msg/async/EventKqueue.cc create mode 100644 src/msg/async/EventKqueue.h diff --git a/configure.ac b/configure.ac index 7a0691d2390f7..1d9fc104eedd6 100644 --- a/configure.ac +++ b/configure.ac @@ -47,6 +47,7 @@ m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])]) case "${target_os}" in darwin*) AC_DEFINE([DARWIN], [1], [Define if darwin/osx]) + darwin="yes" ;; linux*) linux="yes" @@ -57,6 +58,7 @@ freebsd*) esac AM_CONDITIONAL(LINUX, test x"$linux" = x"yes") AM_CONDITIONAL(FREEBSD, test x"$freebsd" = x"yes") +AM_CONDITIONAL(DARWIN, test x"$darwin" = x"yes") # Checks for programs. AC_PROG_CXX diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am index 892eb6682a70e..68f69bf028922 100644 --- a/src/msg/Makefile.am +++ b/src/msg/Makefile.am @@ -11,7 +11,6 @@ noinst_HEADERS += \ msg/SimplePolicyMessenger.h \ msg/msg_types.h -# simple libmsg_la_SOURCES += \ msg/simple/Accepter.cc \ msg/simple/DispatchQueue.cc \ @@ -21,8 +20,20 @@ libmsg_la_SOURCES += \ msg/async/AsyncConnection.cc \ msg/async/AsyncMessenger.cc \ msg/async/Event.cc \ - msg/async/net_handler.cc \ - msg/async/EventEpoll.cc + msg/async/net_handler.cc + +if LINUX +libmsg_la_SOURCES += msg/async/EventEpoll.cc +endif + +if DARWIN +libmsg_la_SOURCES += msg/async/EventKqueue.cc +endif + +if FREEBSD +libmsg_la_SOURCES += msg/async/EventKqueue.cc +endif + noinst_HEADERS += \ msg/simple/Accepter.h \ @@ -33,10 +44,18 @@ noinst_HEADERS += \ msg/async/AsyncConnection.h \ msg/async/AsyncMessenger.h \ msg/async/Event.h \ - msg/async/EventEpoll.h \ msg/async/net_handler.h +if LINUX +libmsg_la_SOURCES += msg/async/EventEpoll.h +endif -noinst_LTLIBRARIES += libmsg.la +if DARWIN +libmsg_la_SOURCES += msg/async/EventKqueue.h +endif +if FREEBSD +libmsg_la_SOURCES += msg/async/EventKqueue.h +endif +noinst_LTLIBRARIES += libmsg.la diff --git a/src/msg/async/EventKqueue.cc b/src/msg/async/EventKqueue.cc new file mode 100644 index 0000000000000..0c1b109f23057 --- /dev/null +++ b/src/msg/async/EventKqueue.cc @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/errno.h" +#include "EventKqueue.h" + +#define dout_subsys ceph_subsys_ms + +#undef dout_prefix +#define dout_prefix *_dout << "KqueueDriver." + +int KqueueDriver::init(int nevent) +{ + events = (struct kevent*)malloc(sizeof(struct kevent)*nevent); + if (!events) { + lderr(cct) << __func__ << " unable to malloc memory: " + << cpp_strerror(errno) << dendl; + return -errno; + } + memset(events, 0, sizeof(struct kevent)*nevent); + + kqfd = kqueue(); + if (kqfd < 0) { + lderr(cct) << __func__ << " unable to do kqueue: " + << cpp_strerror(errno) << dendl; + return -errno; + } + + size = nevent; + + return 0; +} + +int KqueueDriver::add_event(int fd, int cur_mask, int add_mask) +{ + struct kevent ke; + int filter = 0; + filter |= add_mask & EVENT_READABLE ? EVFILT_READ : 0; + filter |= add_mask & EVENT_WRITABLE ? EVFILT_WRITE : 0; + + if (filter) { + EV_SET(&ke, fd, filter, EV_ADD, 0, 0, NULL); + if (kevent(kqfd, &ke, 1, NULL, 0, NULL) == -1) { + lderr(cct) << __func__ << " unable to add event: " + << cpp_strerror(errno) << dendl; + return -1; + } + } + + ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << filter + << dendl; + return 0; +} + +void KqueueDriver::del_event(int fd, int cur_mask, int delmask) +{ + struct kevent ee; + struct kevent ke; + int filter = 0; + filter |= delmask & EVENT_READABLE ? EVFILT_READ : 0; + filter |= delmask & EVENT_WRITABLE ? EVFILT_WRITE : 0; + + if (filter) { + EV_SET(&ke, fd, filter, EV_DELETE, 0, 0, NULL); + if (kevent(kqfd, &ke, 1, NULL, 0, NULL) < 0) { + lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << filter + << " failed." << cpp_strerror(errno) << dendl; + } + } + ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << filter + << dendl; +} + +int KqueueDriver::resize_events(int newsize) +{ + return 0; +} + +int KqueueDriver::event_wait(vector &fired_events, struct timeval *tvp) +{ + int retval, numevents = 0; + struct timespec timeout; + timeout.tv_sec = tvp->tv_sec; + timeout.tv_nsec = tvp->tv_usec * 1000; + + if (tvp != NULL) { + timeout.tv_sec = tvp->tv_sec; + timeout.tv_nsec = tvp->tv_usec * 1000; + retval = kevent(kqfd, NULL, 0, events, size, &timeout); + } else { + retval = kevent(kqfd, NULL, 0, events, size, NULL); + } + + if (retval > 0) { + int j; + + numevents = retval; + fired_events.resize(numevents); + for (j = 0; j < numevents; j++) { + int mask = 0; + struct kevent *e = events + j; + + if (e->filter == EVFILT_READ) mask |= EVENT_READABLE; + if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE; + fired_events[j].fd = (int)e->ident; + fired_events[j].mask = mask; + + } + } + return numevents; +} diff --git a/src/msg/async/EventKqueue.h b/src/msg/async/EventKqueue.h new file mode 100644 index 0000000000000..aa5fc2865b4d2 --- /dev/null +++ b/src/msg/async/EventKqueue.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_EVENTKQUEUE_H +#define CEPH_MSG_EVENTKQUEUE_H + +#include +#include + +#include "Event.h" + +class KqueueDriver : public EventDriver { + int kqfd; + struct kevent *events; + CephContext *cct; + int size; + + public: + KqueueDriver(CephContext *c): kqfd(-1), events(NULL), cct(c) {} + virtual ~KqueueDriver() { + if (kqfd != -1) + close(kqfd); + + if (events) + free(events); + } + + int init(int nevent); + int add_event(int fd, int cur_mask, int add_mask); + void del_event(int fd, int cur_mask, int del_mask); + int resize_events(int newsize); + int event_wait(vector &fired_events, struct timeval *tp); +}; + +#endif -- 2.39.5