]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Add kqueue support 2831/head
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 29 Oct 2014 08:48:07 +0000 (16:48 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 29 Oct 2014 17:15:33 +0000 (01:15 +0800)
AsyncMessenger will select event driver following epoll, kqueue and
select(now not exists) sequence

Fix #9926
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
configure.ac
src/msg/Makefile.am
src/msg/async/EventKqueue.cc [new file with mode: 0644]
src/msg/async/EventKqueue.h [new file with mode: 0644]

index 7a0691d2390f7ee928489a482024659c06cc2dfc..1d9fc104eedd6164cd5cf65be1b938388574419a 100644 (file)
@@ -47,6 +47,7 @@ m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
 case "${target_os}" in
 darwin*)
        AC_DEFINE([DARWIN], [1], [Define if darwin/osx])
+    darwin="yes"
        ;;
 linux*)
        linux="yes"
@@ -57,6 +58,7 @@ freebsd*)
 esac
 AM_CONDITIONAL(LINUX, test x"$linux" = x"yes")
 AM_CONDITIONAL(FREEBSD, test x"$freebsd" = x"yes")
+AM_CONDITIONAL(DARWIN, test x"$darwin" = x"yes")
 
 # Checks for programs.
 AC_PROG_CXX
index 892eb6682a70e7cf37410a49b729797ed1d0e29f..68f69bf0289225b686c4da1d668b4557a3499136 100644 (file)
@@ -11,7 +11,6 @@ noinst_HEADERS += \
        msg/SimplePolicyMessenger.h \
        msg/msg_types.h
 
-# simple
 libmsg_la_SOURCES += \
        msg/simple/Accepter.cc \
        msg/simple/DispatchQueue.cc \
@@ -21,8 +20,20 @@ libmsg_la_SOURCES += \
        msg/async/AsyncConnection.cc \
        msg/async/AsyncMessenger.cc \
        msg/async/Event.cc \
-       msg/async/net_handler.cc \
-       msg/async/EventEpoll.cc
+       msg/async/net_handler.cc
+
+if LINUX
+libmsg_la_SOURCES += msg/async/EventEpoll.cc
+endif
+
+if DARWIN
+libmsg_la_SOURCES += msg/async/EventKqueue.cc
+endif
+
+if FREEBSD
+libmsg_la_SOURCES += msg/async/EventKqueue.cc
+endif
+
 
 noinst_HEADERS += \
        msg/simple/Accepter.h \
@@ -33,10 +44,18 @@ noinst_HEADERS += \
        msg/async/AsyncConnection.h \
        msg/async/AsyncMessenger.h \
        msg/async/Event.h \
-       msg/async/EventEpoll.h \
        msg/async/net_handler.h
 
+if LINUX
+libmsg_la_SOURCES += msg/async/EventEpoll.h
+endif
 
-noinst_LTLIBRARIES += libmsg.la
+if DARWIN
+libmsg_la_SOURCES += msg/async/EventKqueue.h
+endif
 
+if FREEBSD
+libmsg_la_SOURCES += msg/async/EventKqueue.h
+endif
 
+noinst_LTLIBRARIES += libmsg.la
diff --git a/src/msg/async/EventKqueue.cc b/src/msg/async/EventKqueue.cc
new file mode 100644 (file)
index 0000000..0c1b109
--- /dev/null
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/errno.h"
+#include "EventKqueue.h"
+
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << "KqueueDriver."
+
+int KqueueDriver::init(int nevent)
+{
+  events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
+  if (!events) {
+    lderr(cct) << __func__ << " unable to malloc memory: "
+                           << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  memset(events, 0, sizeof(struct kevent)*nevent);
+
+  kqfd = kqueue();
+  if (kqfd < 0) {
+    lderr(cct) << __func__ << " unable to do kqueue: "
+                           << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+
+  size = nevent;
+
+  return 0;
+}
+
+int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
+{
+  struct kevent ke;
+  int filter = 0;
+  filter |= add_mask & EVENT_READABLE ? EVFILT_READ : 0;
+  filter |= add_mask & EVENT_WRITABLE ? EVFILT_WRITE : 0;
+
+  if (filter) {
+    EV_SET(&ke, fd, filter, EV_ADD, 0, 0, NULL);
+    if (kevent(kqfd, &ke, 1, NULL, 0, NULL) == -1) {
+      lderr(cct) << __func__ << " unable to add event: "
+                             << cpp_strerror(errno) << dendl;
+      return -1;
+    }
+  }
+
+  ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << filter
+                 << dendl;
+  return 0;
+}
+
+void KqueueDriver::del_event(int fd, int cur_mask, int delmask)
+{
+  struct kevent ee;
+  struct kevent ke;
+  int filter = 0;
+  filter |= delmask & EVENT_READABLE ? EVFILT_READ : 0;
+  filter |= delmask & EVENT_WRITABLE ? EVFILT_WRITE : 0;
+
+  if (filter) {
+    EV_SET(&ke, fd, filter, EV_DELETE, 0, 0, NULL);
+    if (kevent(kqfd, &ke, 1, NULL, 0, NULL) < 0) {
+      lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << filter
+                 << " failed." << cpp_strerror(errno) << dendl;
+    }
+  }
+  ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << filter
+                 << dendl;
+}
+
+int KqueueDriver::resize_events(int newsize)
+{
+  return 0;
+}
+
+int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
+{
+  int retval, numevents = 0;
+  struct timespec timeout;
+  timeout.tv_sec = tvp->tv_sec;
+  timeout.tv_nsec = tvp->tv_usec * 1000;
+
+  if (tvp != NULL) {
+      timeout.tv_sec = tvp->tv_sec;
+      timeout.tv_nsec = tvp->tv_usec * 1000;
+      retval = kevent(kqfd, NULL, 0, events, size, &timeout);
+  } else {
+      retval = kevent(kqfd, NULL, 0, events, size, NULL);
+  }
+
+  if (retval > 0) {
+    int j;
+
+    numevents = retval;
+    fired_events.resize(numevents);
+    for (j = 0; j < numevents; j++) {
+      int mask = 0;
+      struct kevent *e = events + j;
+
+      if (e->filter == EVFILT_READ) mask |= EVENT_READABLE;
+      if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE;
+      fired_events[j].fd = (int)e->ident;
+      fired_events[j].mask = mask;
+
+    }
+  }
+  return numevents;
+}
diff --git a/src/msg/async/EventKqueue.h b/src/msg/async/EventKqueue.h
new file mode 100644 (file)
index 0000000..aa5fc28
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_EVENTKQUEUE_H
+#define CEPH_MSG_EVENTKQUEUE_H
+
+#include <sys/event.h>
+#include <unistd.h>
+
+#include "Event.h"
+
+class KqueueDriver : public EventDriver {
+  int kqfd;
+  struct kevent *events;
+  CephContext *cct;
+  int size;
+
+ public:
+  KqueueDriver(CephContext *c): kqfd(-1), events(NULL), cct(c) {}
+  virtual ~KqueueDriver() {
+    if (kqfd != -1)
+      close(kqfd);
+
+    if (events)
+      free(events);
+  }
+
+  int init(int nevent);
+  int add_event(int fd, int cur_mask, int add_mask);
+  void del_event(int fd, int cur_mask, int del_mask);
+  int resize_events(int newsize);
+  int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp);
+};
+
+#endif