msg/SimplePolicyMessenger.h \
msg/msg_types.h
-# simple
libmsg_la_SOURCES += \
msg/simple/Accepter.cc \
msg/simple/DispatchQueue.cc \
msg/async/AsyncConnection.cc \
msg/async/AsyncMessenger.cc \
msg/async/Event.cc \
- msg/async/net_handler.cc \
- msg/async/EventEpoll.cc
+ msg/async/net_handler.cc
+
+if LINUX
+libmsg_la_SOURCES += msg/async/EventEpoll.cc
+endif
+
+if DARWIN
+libmsg_la_SOURCES += msg/async/EventKqueue.cc
+endif
+
+if FREEBSD
+libmsg_la_SOURCES += msg/async/EventKqueue.cc
+endif
+
noinst_HEADERS += \
msg/simple/Accepter.h \
msg/async/AsyncConnection.h \
msg/async/AsyncMessenger.h \
msg/async/Event.h \
- msg/async/EventEpoll.h \
msg/async/net_handler.h
+if LINUX
+libmsg_la_SOURCES += msg/async/EventEpoll.h
+endif
-noinst_LTLIBRARIES += libmsg.la
+if DARWIN
+libmsg_la_SOURCES += msg/async/EventKqueue.h
+endif
+if FREEBSD
+libmsg_la_SOURCES += msg/async/EventKqueue.h
+endif
+noinst_LTLIBRARIES += libmsg.la
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/errno.h"
+#include "EventKqueue.h"
+
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << "KqueueDriver."
+
+int KqueueDriver::init(int nevent)
+{
+ events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
+ if (!events) {
+ lderr(cct) << __func__ << " unable to malloc memory: "
+ << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ memset(events, 0, sizeof(struct kevent)*nevent);
+
+ kqfd = kqueue();
+ if (kqfd < 0) {
+ lderr(cct) << __func__ << " unable to do kqueue: "
+ << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+
+ size = nevent;
+
+ return 0;
+}
+
+int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
+{
+ struct kevent ke;
+ int filter = 0;
+ filter |= add_mask & EVENT_READABLE ? EVFILT_READ : 0;
+ filter |= add_mask & EVENT_WRITABLE ? EVFILT_WRITE : 0;
+
+ if (filter) {
+ EV_SET(&ke, fd, filter, EV_ADD, 0, 0, NULL);
+ if (kevent(kqfd, &ke, 1, NULL, 0, NULL) == -1) {
+ lderr(cct) << __func__ << " unable to add event: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ }
+
+ ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << filter
+ << dendl;
+ return 0;
+}
+
+void KqueueDriver::del_event(int fd, int cur_mask, int delmask)
+{
+ struct kevent ee;
+ struct kevent ke;
+ int filter = 0;
+ filter |= delmask & EVENT_READABLE ? EVFILT_READ : 0;
+ filter |= delmask & EVENT_WRITABLE ? EVFILT_WRITE : 0;
+
+ if (filter) {
+ EV_SET(&ke, fd, filter, EV_DELETE, 0, 0, NULL);
+ if (kevent(kqfd, &ke, 1, NULL, 0, NULL) < 0) {
+ lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << filter
+ << " failed." << cpp_strerror(errno) << dendl;
+ }
+ }
+ ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << filter
+ << dendl;
+}
+
+int KqueueDriver::resize_events(int newsize)
+{
+ return 0;
+}
+
+int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
+{
+ int retval, numevents = 0;
+ struct timespec timeout;
+ timeout.tv_sec = tvp->tv_sec;
+ timeout.tv_nsec = tvp->tv_usec * 1000;
+
+ if (tvp != NULL) {
+ timeout.tv_sec = tvp->tv_sec;
+ timeout.tv_nsec = tvp->tv_usec * 1000;
+ retval = kevent(kqfd, NULL, 0, events, size, &timeout);
+ } else {
+ retval = kevent(kqfd, NULL, 0, events, size, NULL);
+ }
+
+ if (retval > 0) {
+ int j;
+
+ numevents = retval;
+ fired_events.resize(numevents);
+ for (j = 0; j < numevents; j++) {
+ int mask = 0;
+ struct kevent *e = events + j;
+
+ if (e->filter == EVFILT_READ) mask |= EVENT_READABLE;
+ if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE;
+ fired_events[j].fd = (int)e->ident;
+ fired_events[j].mask = mask;
+
+ }
+ }
+ return numevents;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_EVENTKQUEUE_H
+#define CEPH_MSG_EVENTKQUEUE_H
+
+#include <sys/event.h>
+#include <unistd.h>
+
+#include "Event.h"
+
+class KqueueDriver : public EventDriver {
+ int kqfd;
+ struct kevent *events;
+ CephContext *cct;
+ int size;
+
+ public:
+ KqueueDriver(CephContext *c): kqfd(-1), events(NULL), cct(c) {}
+ virtual ~KqueueDriver() {
+ if (kqfd != -1)
+ close(kqfd);
+
+ if (events)
+ free(events);
+ }
+
+ int init(int nevent);
+ int add_event(int fd, int cur_mask, int add_mask);
+ void del_event(int fd, int cur_mask, int del_mask);
+ int resize_events(int newsize);
+ int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp);
+};
+
+#endif