]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: add new async event driver based on poll()
authorRaf Lopez <rafael.lopez@softiron.com>
Fri, 3 Jun 2022 04:28:16 +0000 (04:28 +0000)
committerRafael Lopez <rafael.lopez@softiron.com>
Mon, 6 Jun 2022 01:58:44 +0000 (01:58 +0000)
Driver to replace select() where useful, currently this is
windows clients as select is the only available driver for it.
Windows is limited by the FD_SETSIZE hard limit of 64
descriptors. This driver Uses poll() or WSAPoll() and maintains
pollfd structures to overcome select() limitations.

Fixes: https://tracker.ceph.com/issues/55840
Signed-off-by: Rafael Lopez <rafael.lopez@softiron.com>
src/msg/CMakeLists.txt
src/msg/async/Event.cc
src/msg/async/Event.h
src/msg/async/EventPoll.cc [new file with mode: 0644]
src/msg/async/EventPoll.h [new file with mode: 0644]

index 2aa80c58b900b41ff46c45a453542b399bb31f3e..e8cc3fdabdf675e9dd93be5c54045a381d3fa751 100644 (file)
@@ -29,6 +29,11 @@ elseif(FREEBSD OR APPLE)
     async/EventKqueue.cc)
 endif(LINUX)
 
+if(WIN32)
+  list(APPEND msg_srcs
+    async/EventPoll.cc)
+endif(WIN32)
+
 if(HAVE_RDMA)
   list(APPEND msg_srcs
     async/rdma/Infiniband.cc
index 2c545c07b37445991290d18de7b9539f043099b8..4662e42bd144dae1ca1b359152ba17a4eb108af4 100644 (file)
 #ifdef HAVE_KQUEUE
 #include "EventKqueue.h"
 #else
+#ifdef HAVE_POLL
+#include "EventPoll.h"
+#else
 #include "EventSelect.h"
 #endif
 #endif
+#endif
 
 #define dout_subsys ceph_subsys_ms
 
@@ -122,9 +126,13 @@ int EventCenter::init(int nevent, unsigned center_id, const std::string &type)
 #else
 #ifdef HAVE_KQUEUE
   driver = new KqueueDriver(cct);
+#else
+#ifdef HAVE_POLL
+  driver = new PollDriver(cct);
 #else
   driver = new SelectDriver(cct);
 #endif
+#endif
 #endif
   }
 
index 1812db3cd9cfa0153c08e7440b1ccd25f03e7ae7..753a7c699f25bed6e64cdab7e812503659ca8521 100644 (file)
 #define HAVE_KQUEUE 1
 #endif
 
+#ifdef _WIN32
+#define HAVE_POLL 1
+#endif
+
 #ifdef __sun
 #include <sys/feature_tests.h>
 #ifdef _DTRACE_VERSION
diff --git a/src/msg/async/EventPoll.cc b/src/msg/async/EventPoll.cc
new file mode 100644 (file)
index 0000000..d3b3c22
--- /dev/null
@@ -0,0 +1,190 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2022 Rafael Lopez <rafael.lopez@softiron.com>
+ *
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/errno.h"
+#include "EventPoll.h"
+
+#include <unistd.h>
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << "PollDriver."
+
+#ifndef POLL_ADD
+#define POLL_ADD 1
+#ifndef POLL_MOD
+#define POLL_MOD 2
+#ifndef POLL_DEL
+#define POLL_DEL 3
+#endif
+#endif
+#endif
+
+int PollDriver::init(EventCenter *c, int nevent)
+{
+  // pfds array will auto scale up to hard_max_pfds, which should be
+  // greater than total daemons/op_threads (todo: cfg option?)
+  hard_max_pfds = 8192;
+  // 128 seems a good starting point, cover clusters up to ~350 OSDs
+  // with default ms_async_op_threads
+  max_pfds = 128;
+
+  pfds = (POLLFD*)calloc(max_pfds, sizeof(POLLFD));
+  if (!pfds) {
+    lderr(cct) << __func__ << " unable to allocate memory " << dendl;
+    return -ENOMEM;
+  }
+
+  //initialise pfds
+  for(int i = 0; i < max_pfds; i++){
+    pfds[i].fd = -1;
+    pfds[i].events = 0;
+    pfds[i].revents = 0;
+  }
+  return 0;
+}
+
+// Helper func to register/unregister interest in a FD's events by
+// manipulating it's entry in pfds array
+int PollDriver::poll_ctl(int fd, int op, int events) 
+{
+  int pos = 0;
+  if (op == POLL_ADD) {
+    // Find an empty pollfd slot
+    for(pos = 0; pos < max_pfds ; pos++){
+      if(pfds[pos].fd == -1){
+        pfds[pos].fd = fd;
+        pfds[pos].events = events;
+        pfds[pos].revents = 0;
+        return 0;
+      }
+    }
+    // We ran out of slots, try to increase
+    if (max_pfds < hard_max_pfds) {
+      ldout(cct, 10) << __func__ << " exhausted pollfd structure slots"
+                     << ", doubling to " << max_pfds*2 << dendl;
+      pfds = (POLLFD*)realloc(pfds, max_pfds*2*sizeof(POLLFD));
+      if (!pfds) {
+        lderr(cct) << __func__ << " unable to realloc for more pollfd structures"
+                   << dendl;
+        return -ENOMEM;
+      }
+      // Initialise new slots
+      for (int i = max_pfds ; i < max_pfds*2 ; i++){
+        pfds[i].fd = -1;
+        pfds[i].events = 0;
+        pfds[i].revents = 0;
+      }
+      max_pfds = max_pfds*2;
+      pfds[pos].fd = fd;
+      pfds[pos].events = events;
+      pfds[pos].revents = 0;
+      return 0;
+    } else {
+    // Hit hard limit
+    lderr(cct) << __func__ << " hard limit for file descriptors per op" 
+               << " thread reached (" << hard_max_pfds << ")" << dendl;
+    return -EMFILE;
+    }
+  } else if (op == POLL_MOD) {
+    for (pos = 0; pos < max_pfds; pos++ ){
+      if (pfds[pos].fd == fd) {
+        pfds[pos].events = events;
+        return 0;
+      }
+    }
+  } else if (op == POLL_DEL) {
+    for (pos = 0; pos < max_pfds; pos++ ){
+      if (pfds[pos].fd == fd) {
+        pfds[pos].fd = -1;
+        pfds[pos].events = 0;
+        return 0;
+      }
+    }
+  }
+  return 0;
+}
+
+int PollDriver::add_event(int fd, int cur_mask, int add_mask)
+{
+  ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << add_mask
+                 << dendl;
+  int op, events = 0;
+  op = cur_mask == EVENT_NONE ? POLL_ADD: POLL_MOD;
+
+  add_mask |= cur_mask; /* Merge old events */
+  if (add_mask & EVENT_READABLE)
+    events |= POLLIN;
+  if (add_mask & EVENT_WRITABLE)
+    events |= POLLOUT;
+  int ret = poll_ctl(fd, op, events);
+  return ret;
+}
+
+int PollDriver::del_event(int fd, int cur_mask, int delmask)
+{
+  ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << cur_mask
+                 << dendl;
+  int op, events = 0;
+  int mask = cur_mask & (~delmask);
+
+  if (mask != EVENT_NONE) {
+    op = POLL_MOD;
+    if (mask & EVENT_READABLE)
+      events |= POLLIN;
+    if (mask & EVENT_WRITABLE)
+      events |= POLLOUT;
+  } else {
+    op = POLL_DEL;
+  }
+  poll_ctl(fd, op, events);
+  return 0;
+}
+
+int PollDriver::resize_events(int newsize)
+{
+  return 0;
+}
+
+int PollDriver::event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tvp)
+{
+  int retval, numevents = 0;
+#ifdef _WIN32
+  retval = WSAPoll(pfds, max_pfds,
+                      tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
+#else
+  retval = poll(pfds, max_pfds,
+                      tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
+#endif
+  if (retval > 0) {
+    for (int j = 0; j < max_pfds; j++) {
+      if (pfds[j].fd != -1) {
+        int mask = 0;
+        struct FiredFileEvent fe;
+        if (pfds[j].revents & POLLIN)
+          mask |= EVENT_READABLE;
+        if (pfds[j].revents & POLLOUT)
+          mask |= EVENT_WRITABLE;
+        if (mask) {
+          fe.fd = pfds[j].fd;
+          fe.mask = mask;
+          fired_events.push_back(fe);
+          numevents++;
+        }
+      }
+    }
+  }
+  return numevents;
+}
diff --git a/src/msg/async/EventPoll.h b/src/msg/async/EventPoll.h
new file mode 100644 (file)
index 0000000..866990c
--- /dev/null
@@ -0,0 +1,49 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2022 Rafael Lopez <rafael.lopez@softiron.com>
+ *
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_EVENTPOLL_H
+#define CEPH_MSG_EVENTPOLL_H
+
+#include "Event.h"
+#ifdef _WIN32
+#include <winsock2.h>
+#else
+#include <poll.h>
+#endif
+
+typedef struct pollfd POLLFD;
+
+class PollDriver : public EventDriver {
+  int max_pfds;
+  int hard_max_pfds;
+  POLLFD *pfds;
+  CephContext *cct;
+
+ private:
+  int poll_ctl(int, int, int);
+
+ public:
+  explicit PollDriver(CephContext *c): cct(c) {}
+  ~PollDriver() override {}
+
+  int init(EventCenter *c, int nevent) override;
+  int add_event(int fd, int cur_mask, int add_mask) override;
+  int del_event(int fd, int cur_mask, int del_mask) override;
+  int resize_events(int newsize) override;
+  int event_wait(std::vector<FiredFileEvent> &fired_events,
+                struct timeval *tp) override;
+};
+
+#endif