From: Insu Jang Date: Thu, 3 Dec 2020 02:09:25 +0000 (+0900) Subject: msg/async: refine worker creation in NetworkStack X-Git-Tag: v16.1.0~336^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ff65c800b3e1f3f7e3989223b9bde4cbbaf5c076;p=ceph.git msg/async: refine worker creation in NetworkStack This commit updates NetworkStack::create_worker() to a virtual function to reduce type check redundancy. Since calling a pure virtual function in a constructor method is strictly prohibited, NetworkStack::create_worker() is currently not implemented as a virtual function. It requires duplicated type check with NetworkStack::create(), making the code a little bit tricky. Considering NetworkStack instances can only be instantiated through NetworkStack::create(), this commit moves worker creation out of its constructor to NetworkStack::create(), makes it a pure virtual, and lets inherited classes implement it to remove the type check redundancy. By making it a non-static, we can also reduce unnecessary class member function exposure. Signed-off-by: Insu Jang --- diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h index 4aed9dd6444..e1d85f1f4b7 100644 --- a/src/msg/async/PosixStack.h +++ b/src/msg/async/PosixStack.h @@ -40,6 +40,10 @@ class PosixWorker : public Worker { class PosixNetworkStack : public NetworkStack { std::vector threads; + virtual Worker* create_worker(CephContext *c, unsigned worker_id) override { + return new PosixWorker(c, worker_id); + } + public: explicit PosixNetworkStack(CephContext *c, const std::string &t); diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 6885fd4f9db..bf1cd8203b6 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -66,47 +66,42 @@ std::function NetworkStack::add_thread(unsigned worker_id) std::shared_ptr NetworkStack::create(CephContext *c, const std::string &t) { + std::shared_ptr stack = nullptr; + if (t == "posix") - return std::make_shared(c, t); + stack.reset(new PosixNetworkStack(c, t)); #ifdef HAVE_RDMA else if (t == "rdma") - return std::make_shared(c, t); + stack.reset(new RDMAStack(c, t)); #endif #ifdef HAVE_DPDK else if (t == "dpdk") - return std::make_shared(c, t); + stack.reset(new DPDKStack(c, t)); #endif - lderr(c) << __func__ << " ms_async_transport_type " << t << + if (stack == nullptr) { + lderr(c) << __func__ << " ms_async_transport_type " << t << " is not supported! " << dendl; - ceph_abort(); - return nullptr; -} - -Worker* NetworkStack::create_worker(CephContext *c, const std::string &type, unsigned worker_id) -{ - if (type == "posix") - return new PosixWorker(c, worker_id); -#ifdef HAVE_RDMA - else if (type == "rdma") - return new RDMAWorker(c, worker_id); -#endif -#ifdef HAVE_DPDK - else if (type == "dpdk") - return new DPDKWorker(c, worker_id); -#endif + ceph_abort(); + return nullptr; + } + + const int InitEventNumber = 5000; + for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) { + Worker *w = stack->create_worker(c, worker_id); + int ret = w->center.init(InitEventNumber, worker_id, t); + if (ret) + throw std::system_error(-ret, std::generic_category()); + stack->workers.push_back(w); + } - lderr(c) << __func__ << " ms_async_transport_type " << type << - " is not supported! " << dendl; - ceph_abort(); - return nullptr; + return stack; } NetworkStack::NetworkStack(CephContext *c, const std:: string &t): type(t), started(false), cct(c) { ceph_assert(cct->_conf->ms_async_op_threads > 0); - const int InitEventNumber = 5000; num_workers = cct->_conf->ms_async_op_threads; if (num_workers >= EventCenter::MAX_EVENTCENTER) { ldout(cct, 0) << __func__ << " max thread limit is " @@ -115,14 +110,6 @@ NetworkStack::NetworkStack(CephContext *c, const std:: string &t): type(t), star << dendl; num_workers = EventCenter::MAX_EVENTCENTER; } - - for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) { - Worker *w = create_worker(cct, type, worker_id); - int ret = w->center.init(InitEventNumber, worker_id, type); - if (ret) - throw std::system_error(-ret, std::generic_category()); - workers.push_back(w); - } } void NetworkStack::start() diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index 7b8b62f36fb..1bf4baa519c 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -300,6 +300,8 @@ class NetworkStack { std::function add_thread(unsigned i); + virtual Worker* create_worker(CephContext *c, unsigned i) = 0; + protected: CephContext *cct; std::vector workers; @@ -316,8 +318,6 @@ class NetworkStack { static std::shared_ptr create( CephContext *c, const std::string &type); - static Worker* create_worker( - CephContext *c, const std::string &t, unsigned i); // backend need to override this method if backend doesn't support shared // listen table. // For example, posix backend has in kernel global listen table. If one diff --git a/src/msg/async/dpdk/DPDKStack.h b/src/msg/async/dpdk/DPDKStack.h index 37626bee492..43ae8003c39 100644 --- a/src/msg/async/dpdk/DPDKStack.h +++ b/src/msg/async/dpdk/DPDKStack.h @@ -248,6 +248,11 @@ class DPDKWorker : public Worker { class DPDKStack : public NetworkStack { vector > funcs; + + virtual Worker* create_worker(CephContext *c, unsigned worker_id) override { + return new DPDKWorker(c, worker_id); + } + public: explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) { funcs.resize(cct->_conf->ms_async_max_op_threads); diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index ac33fcae8e7..d2eafb74c14 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -326,6 +326,10 @@ class RDMAStack : public NetworkStack { std::atomic fork_finished = {false}; + virtual Worker* create_worker(CephContext *c, unsigned worker_id) override { + return new RDMAWorker(c, worker_id); + } + public: explicit RDMAStack(CephContext *cct, const std::string &t); virtual ~RDMAStack();