From: Raf Lopez Date: Fri, 3 Jun 2022 04:28:16 +0000 (+0000) Subject: msg: add new async event driver based on poll() X-Git-Tag: v18.1.0~874^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=08f5a028f8912b4148e7a779a7edba56a1015c4b;p=ceph.git msg: add new async event driver based on poll() Driver to replace select() where useful, currently this is windows clients as select is the only available driver for it. Windows is limited by the FD_SETSIZE hard limit of 64 descriptors. This driver Uses poll() or WSAPoll() and maintains pollfd structures to overcome select() limitations. Fixes: https://tracker.ceph.com/issues/55840 Signed-off-by: Rafael Lopez --- diff --git a/src/msg/CMakeLists.txt b/src/msg/CMakeLists.txt index 2aa80c58b900b..e8cc3fdabdf67 100644 --- a/src/msg/CMakeLists.txt +++ b/src/msg/CMakeLists.txt @@ -29,6 +29,11 @@ elseif(FREEBSD OR APPLE) async/EventKqueue.cc) endif(LINUX) +if(WIN32) + list(APPEND msg_srcs + async/EventPoll.cc) +endif(WIN32) + if(HAVE_RDMA) list(APPEND msg_srcs async/rdma/Infiniband.cc diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 2c545c07b3744..4662e42bd144d 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -28,9 +28,13 @@ #ifdef HAVE_KQUEUE #include "EventKqueue.h" #else +#ifdef HAVE_POLL +#include "EventPoll.h" +#else #include "EventSelect.h" #endif #endif +#endif #define dout_subsys ceph_subsys_ms @@ -122,9 +126,13 @@ int EventCenter::init(int nevent, unsigned center_id, const std::string &type) #else #ifdef HAVE_KQUEUE driver = new KqueueDriver(cct); +#else +#ifdef HAVE_POLL + driver = new PollDriver(cct); #else driver = new SelectDriver(cct); #endif +#endif #endif } diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 1812db3cd9cfa..753a7c699f25b 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -30,6 +30,10 @@ #define HAVE_KQUEUE 1 #endif +#ifdef _WIN32 +#define HAVE_POLL 1 +#endif + #ifdef __sun #include #ifdef _DTRACE_VERSION diff --git a/src/msg/async/EventPoll.cc b/src/msg/async/EventPoll.cc new file mode 100644 index 0000000000000..d3b3c22482c03 --- /dev/null +++ b/src/msg/async/EventPoll.cc @@ -0,0 +1,190 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2022 Rafael Lopez + * + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/errno.h" +#include "EventPoll.h" + +#include +#define dout_subsys ceph_subsys_ms + +#undef dout_prefix +#define dout_prefix *_dout << "PollDriver." + +#ifndef POLL_ADD +#define POLL_ADD 1 +#ifndef POLL_MOD +#define POLL_MOD 2 +#ifndef POLL_DEL +#define POLL_DEL 3 +#endif +#endif +#endif + +int PollDriver::init(EventCenter *c, int nevent) +{ + // pfds array will auto scale up to hard_max_pfds, which should be + // greater than total daemons/op_threads (todo: cfg option?) + hard_max_pfds = 8192; + // 128 seems a good starting point, cover clusters up to ~350 OSDs + // with default ms_async_op_threads + max_pfds = 128; + + pfds = (POLLFD*)calloc(max_pfds, sizeof(POLLFD)); + if (!pfds) { + lderr(cct) << __func__ << " unable to allocate memory " << dendl; + return -ENOMEM; + } + + //initialise pfds + for(int i = 0; i < max_pfds; i++){ + pfds[i].fd = -1; + pfds[i].events = 0; + pfds[i].revents = 0; + } + return 0; +} + +// Helper func to register/unregister interest in a FD's events by +// manipulating it's entry in pfds array +int PollDriver::poll_ctl(int fd, int op, int events) +{ + int pos = 0; + if (op == POLL_ADD) { + // Find an empty pollfd slot + for(pos = 0; pos < max_pfds ; pos++){ + if(pfds[pos].fd == -1){ + pfds[pos].fd = fd; + pfds[pos].events = events; + pfds[pos].revents = 0; + return 0; + } + } + // We ran out of slots, try to increase + if (max_pfds < hard_max_pfds) { + ldout(cct, 10) << __func__ << " exhausted pollfd structure slots" + << ", doubling to " << max_pfds*2 << dendl; + pfds = (POLLFD*)realloc(pfds, max_pfds*2*sizeof(POLLFD)); + if (!pfds) { + lderr(cct) << __func__ << " unable to realloc for more pollfd structures" + << dendl; + return -ENOMEM; + } + // Initialise new slots + for (int i = max_pfds ; i < max_pfds*2 ; i++){ + pfds[i].fd = -1; + pfds[i].events = 0; + pfds[i].revents = 0; + } + max_pfds = max_pfds*2; + pfds[pos].fd = fd; + pfds[pos].events = events; + pfds[pos].revents = 0; + return 0; + } else { + // Hit hard limit + lderr(cct) << __func__ << " hard limit for file descriptors per op" + << " thread reached (" << hard_max_pfds << ")" << dendl; + return -EMFILE; + } + } else if (op == POLL_MOD) { + for (pos = 0; pos < max_pfds; pos++ ){ + if (pfds[pos].fd == fd) { + pfds[pos].events = events; + return 0; + } + } + } else if (op == POLL_DEL) { + for (pos = 0; pos < max_pfds; pos++ ){ + if (pfds[pos].fd == fd) { + pfds[pos].fd = -1; + pfds[pos].events = 0; + return 0; + } + } + } + return 0; +} + +int PollDriver::add_event(int fd, int cur_mask, int add_mask) +{ + ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << add_mask + << dendl; + int op, events = 0; + op = cur_mask == EVENT_NONE ? POLL_ADD: POLL_MOD; + + add_mask |= cur_mask; /* Merge old events */ + if (add_mask & EVENT_READABLE) + events |= POLLIN; + if (add_mask & EVENT_WRITABLE) + events |= POLLOUT; + int ret = poll_ctl(fd, op, events); + return ret; +} + +int PollDriver::del_event(int fd, int cur_mask, int delmask) +{ + ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << cur_mask + << dendl; + int op, events = 0; + int mask = cur_mask & (~delmask); + + if (mask != EVENT_NONE) { + op = POLL_MOD; + if (mask & EVENT_READABLE) + events |= POLLIN; + if (mask & EVENT_WRITABLE) + events |= POLLOUT; + } else { + op = POLL_DEL; + } + poll_ctl(fd, op, events); + return 0; +} + +int PollDriver::resize_events(int newsize) +{ + return 0; +} + +int PollDriver::event_wait(std::vector &fired_events, struct timeval *tvp) +{ + int retval, numevents = 0; +#ifdef _WIN32 + retval = WSAPoll(pfds, max_pfds, + tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); +#else + retval = poll(pfds, max_pfds, + tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); +#endif + if (retval > 0) { + for (int j = 0; j < max_pfds; j++) { + if (pfds[j].fd != -1) { + int mask = 0; + struct FiredFileEvent fe; + if (pfds[j].revents & POLLIN) + mask |= EVENT_READABLE; + if (pfds[j].revents & POLLOUT) + mask |= EVENT_WRITABLE; + if (mask) { + fe.fd = pfds[j].fd; + fe.mask = mask; + fired_events.push_back(fe); + numevents++; + } + } + } + } + return numevents; +} diff --git a/src/msg/async/EventPoll.h b/src/msg/async/EventPoll.h new file mode 100644 index 0000000000000..866990c8517e4 --- /dev/null +++ b/src/msg/async/EventPoll.h @@ -0,0 +1,49 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2022 Rafael Lopez + * + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_EVENTPOLL_H +#define CEPH_MSG_EVENTPOLL_H + +#include "Event.h" +#ifdef _WIN32 +#include +#else +#include +#endif + +typedef struct pollfd POLLFD; + +class PollDriver : public EventDriver { + int max_pfds; + int hard_max_pfds; + POLLFD *pfds; + CephContext *cct; + + private: + int poll_ctl(int, int, int); + + public: + explicit PollDriver(CephContext *c): cct(c) {} + ~PollDriver() override {} + + int init(EventCenter *c, int nevent) override; + int add_event(int fd, int cur_mask, int add_mask) override; + int del_event(int fd, int cur_mask, int del_mask) override; + int resize_events(int newsize) override; + int event_wait(std::vector &fired_events, + struct timeval *tp) override; +}; + +#endif